第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號安全,請及時綁定郵箱和手機立即綁定

StreamingPro添加Scala script 模塊支持

SQL 在解析字符串方面,能力还是有限,因为支持的算子譬如substring,split等有限,且不具备复杂的流程表达能力。我们内部有个通过JSON描述的DSL引擎方便配置化解析,然而也有一定的学习时间成本。

我们当然可以通过SQL的 UDF函数等来完成字符串解析,在streamingpro中也很简单,只要注册下你的UDF函数库即可:

"udf_register": {    "desc": "测试",    "strategy": "spark",    "algorithm": [],    "ref": [],    "compositor": [
      {        "name": "sql.udf",        "params": [
          {            "analysis": "streaming.core.compositor.spark.udf.func.MLFunctions"
          }
        ]
      }
    ]
  }

这样你就可以在SQL中使用MLfunctions里面所有的udf函数了。然而为此专门提供一个jar包也是略显麻烦。

这个时候如果能直接写脚本解析就好了,最好是能支持各种脚本,比如groovy,javascript,python,scala,java等。任何一个会编程的人都可以实现一个比较复杂的解析逻辑。

核心是ScriptCompositor模块:

 {        "name": "batch.script",        "params": [
          {            "inputTableName": "test",            "outputTableName": "test3"
          },
          {            "raw": [              "val Array(a,b)=rawLine.split(\"\t\");",              "Map(\"a\"->a,\"b\"->b)"
            ]
          }
        ]
      }

如果我想在代码里直接处理所有的列,则如下:

{        "name": "batch.script",        "params": [
          {            "inputTableName": "test2",            "outputTableName": "test3",            "useDocMap": true
          },
          {            "anykey": "val Array(a,b)=doc(\"raw\").toString.split(\"\t\");Map(\"a\"->a,\"b\"->b)"
          }
        ]
}

通过添加useDocMap为true,则你在代码里可以通过doc(doc是个Map[String,Any]) 来获取你想要的任何字段,然后形成一个新的Map。

如果你只要新生成Map里的字段,忽略掉旧的,则设置ignoreOldColumns=true 即可。

你可以把代码放到一个文件里,如下:

{        "name": "batch.script",        "params": [
          {            "inputTableName": "test",            "outputTableName": "test3"
          },
          {            "raw": "file:///tmp/raw_process.scala"
          }
        ]
      }

通过inputTableName指定输入的表,outputTableName作为输出结果表。 raw代表inputTableName中你需要解析的字段,然后通过你的scala脚本进行解析。在脚本中 rawLine 是固定的,对应raw字段(其他字段也是一样)的值。脚本只有一个要求,最后的返回结果暂时需要是个Map[String,Any]。

这里,你只是提供了一个map作为返回值,作为一行,然后以outputTableName指定的名字输出,作为下一条SQL的输入,所以StreamingPro需要推测出你的Schema。 数据量大到一定程度,推测Schema的效率就得不到保证,这个时候,你可以通过配置schema来提升性能:

{        "name": "batch.script",        "params": [
          {            "inputTableName": "test",            "outputTableName": "test3",            "schema": "file:///tmp/schema.scala",            "useDocMap": true
          },
          {            "raw": "file:///tmp/raw_process.scala"
          }
        ]
      }

schema.scala的内容大致如下:

Some(StructType(Array(StructField("a", StringType, true),StructField("b", StringType, true)))
)

后续roadmap是:

  1. 支持外部脚本,比如放在hdfs或者http服务器上。

  2. 支持java 脚本

  3. 支持javascript脚本

  4. 支持 python 脚本

  5. 支持 ruby脚本

  6. 支持 groovy 脚本

举个案例,从HDFS读取一个文件,并且映射为只有一个raw字段的表,接着通过ScriptCompositor配置的scala代码解析raw字段,展开成a,b两个字段,然后继续用SQL继续处理,最后输出。

{  "convert_data_parquet": {    "desc": "测试",    "strategy": "spark",    "algorithm": [],    "ref": [],    "compositor": [
      {        "name": "batch.sources",        "params": [
          {            "path": "file:///tmp/hdfsfile",            "format": "org.apache.spark.sql.execution.datasources.hdfs",            "fieldName": "raw",            "outputTableName":"test"
          }
        ]
      },     
      {        "name": "batch.script",        "params": [
          {            "inputTableName": "test",            "outputTableName": "test3"
          },
          {            "raw": [              "val Array(a,b)=rawLine.split(\"\t\");",              "Map(\"a\"->a,\"b\"->b)"
            ]
          }
        ]
      },
      {        "name": "batch.sql",        "params": [
          {            "sql": "select a,b  from test3 "
          }
        ]
      },
      {        "name": "batch.outputs",        "params": [
          {           "format":"console"
          }
        ]
      }
    ],    "configParams": {
    }
  }
}

         

             




作者:祝威廉
链接:https://www.jianshu.com/p/b33c36cd3481


點擊查看更多內(nèi)容
TA 點贊

若覺得本文不錯,就分享一下吧!

評論

作者其他優(yōu)質(zhì)文章

正在加載中
  • 推薦
  • 評論
  • 收藏
  • 共同學習,寫下你的評論
感謝您的支持,我會繼續(xù)努力的~
掃碼打賞,你說多少就多少
贊賞金額會直接到老師賬戶
支付方式
打開微信掃一掃,即可進行掃碼打賞哦
今天注冊有機會得

100積分直接送

付費專欄免費學

大額優(yōu)惠券免費領(lǐng)

立即參與 放棄機會
微信客服

購課補貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動學習伙伴

公眾號

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號

舉報

0/150
提交
取消