我使用以下 JSON 結(jié)構(gòu)處理來自 Kafka 的消息:{"unix_time": 1557678233, "category_id": 1000, "ip": "172.10.34.17", "type": "view"}我想打印出我收到的內(nèi)容。這是我已經(jīng)完成的代碼片段:JavaSparkContext sc = createJavaSparkContext();JavaStreamingContext streamingContext = new JavaStreamingContext(sc, Durations.seconds(BATCH_DURATION_IN_SECONDS));SparkSession sparkSession = SparkSession .builder() .config(new SparkConf()) .getOrCreate();Dataset<Row> df = sparkSession .readStream() .format("kafka") .option("kafka.bootstrap.servers", CommonUtils.KAFKA_HOST_PORT) .option("subscribe", KAFKA_TOPIC) .load();StreamingQuery query = df.selectExpr("CAST(value AS STRING)") .select(from_json(new Column("value"), getSchema())).as("data"). select("data.category_id").writeStream().foreach(new ForeachWriter<Row>() { @Override public void process(Row value) { System.out.println(value); } @Override public void close(Throwable errorOrNull) { } @Override public boolean open(long partitionId, long version) { return true; } }) .start(); query.awaitTermination();架構(gòu)方法:private static StructType getSchema() { return new StructType(new StructField[]{ new StructField(UNIX_TIME, DataTypes.TimestampType, false, Metadata.empty()), new StructField(CATEGORY_ID, DataTypes.IntegerType, false, Metadata.empty()), new StructField(IP, DataTypes.StringType, false, Metadata.empty()), new StructField(TYPE, DataTypes.StringType, false, Metadata.empty()), });}如何克服這個問題?對此有何建議?
1 回答

慕虎7371278
TA貢獻(xiàn)1802條經(jīng)驗 獲得超4個贊
異常的這一部分準(zhǔn)確地告訴您在哪里尋找答案:
無法解析給定輸入列的“data.category_id”:[jsontostruct(value)]
換句話說,data.category_id
可用列中沒有一列只是 1 列jsontostruct(value)
。
這意味著僅select
在流式查詢中不起作用。原因相當(dāng)簡單(我可以將其視為拼寫錯誤)——在Column和Datasetas("data")
類型上可用的右括號太多。
總之,替換查詢的以下部分:
.select(from_json(new?Column("value"),?getSchema())).as("data")
至以下內(nèi)容:
.select(from_json(new?Column("value"),?getSchema()).as("data"))
請注意,我將一個右括號移到了末尾。
添加回答
舉報
0/150
提交
取消