第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號安全,請及時綁定郵箱和手機(jī)立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

將 JSON 保存到 HDFS 的結(jié)構(gòu)化流

將 JSON 保存到 HDFS 的結(jié)構(gòu)化流

繁星淼淼 2023-04-26 14:18:58
我的 Structured Spark Streaming 程序是從 Kafka 讀取 JSON 數(shù)據(jù)并以 JSON 格式寫入 HDFS。我能夠?qū)?JSON 保存到 HDFS,但它保存了 JSON 字符串: "jsontostructs(CAST(value AS STRING))"key as below: {"jsontostructs(CAST(value AS STRING))":{"age":42,"name":"John"}}.如何只保存{"age":42,"name":"John"}?StructType schema = kafkaPrimerRow.schema();//Read json from kafka. JSON is: {"age":42,"name":"John"}Dataset<Row> df = spark                    .readStream()                    .format("kafka")                    .option("kafka.bootstrap.servers", input_bootstrap_server)                    .option("subscribe", topics[0])                    .load();    //Save Stream to HDFS    StreamingQuery ds = df             .select(functions.from_json(col("value").cast(DataTypes.StringType),schema)) .writeStream().format("json").outputMode(OutputMode.Append()).option("path", destPath).option("checkpointLocation", checkpoint).start();
查看完整描述

1 回答

?
BIG陽

TA貢獻(xiàn)1859條經(jīng)驗 獲得超6個贊

以下 .select("data.*") 達(dá)到了目的。

StreamingQuery ds = df
                        .select(functions.from_json(col("value").cast(DataTypes.StringType),schema).as("data"))
                        .select("data.*")
                        .writeStream()
                        .format("json")
                        .outputMode(OutputMode.Append())
                        .option("path", destPath)
                        .option("checkpointLocation", checkpoint)
                        .start();


查看完整回答
反對 回復(fù) 2023-04-26
  • 1 回答
  • 0 關(guān)注
  • 200 瀏覽

添加回答

舉報

0/150
提交
取消
微信客服

購課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動學(xué)習(xí)伙伴

公眾號

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號