我的 Structured Spark Streaming 程序是從 Kafka 讀取 JSON 數(shù)據(jù)并以 JSON 格式寫入 HDFS。我能夠?qū)?JSON 保存到 HDFS,但它保存了 JSON 字符串: "jsontostructs(CAST(value AS STRING))"key as below: {"jsontostructs(CAST(value AS STRING))":{"age":42,"name":"John"}}.如何只保存{"age":42,"name":"John"}?StructType schema = kafkaPrimerRow.schema();//Read json from kafka. JSON is: {"age":42,"name":"John"}Dataset<Row> df = spark .readStream() .format("kafka") .option("kafka.bootstrap.servers", input_bootstrap_server) .option("subscribe", topics[0]) .load(); //Save Stream to HDFS StreamingQuery ds = df .select(functions.from_json(col("value").cast(DataTypes.StringType),schema)) .writeStream().format("json").outputMode(OutputMode.Append()).option("path", destPath).option("checkpointLocation", checkpoint).start();
1 回答

BIG陽
TA貢獻(xiàn)1859條經(jīng)驗 獲得超6個贊
以下 .select("data.*") 達(dá)到了目的。
StreamingQuery ds = df .select(functions.from_json(col("value").cast(DataTypes.StringType),schema).as("data")) .select("data.*") .writeStream() .format("json") .outputMode(OutputMode.Append()) .option("path", destPath) .option("checkpointLocation", checkpoint) .start();
添加回答
舉報
0/150
提交
取消