第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號安全,請及時綁定郵箱和手機(jī)立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何在流查詢中使用 from_json 標(biāo)準(zhǔn)函數(shù)(在 select 中)?

如何在流查詢中使用 from_json 標(biāo)準(zhǔn)函數(shù)(在 select 中)?

搖曳的薔薇 2023-07-19 16:32:33
我使用以下 JSON 結(jié)構(gòu)處理來自 Kafka 的消息:{"unix_time": 1557678233, "category_id": 1000, "ip": "172.10.34.17", "type": "view"}我想打印出我收到的內(nèi)容。這是我已經(jīng)完成的代碼片段:JavaSparkContext sc = createJavaSparkContext();JavaStreamingContext streamingContext =                new JavaStreamingContext(sc, Durations.seconds(BATCH_DURATION_IN_SECONDS));SparkSession sparkSession = SparkSession        .builder()        .config(new SparkConf())        .getOrCreate();Dataset<Row> df = sparkSession        .readStream()        .format("kafka")        .option("kafka.bootstrap.servers", CommonUtils.KAFKA_HOST_PORT)        .option("subscribe", KAFKA_TOPIC)        .load();StreamingQuery query = df.selectExpr("CAST(value AS STRING)")            .select(from_json(new Column("value"), getSchema())).as("data").                    select("data.category_id").writeStream().foreach(new ForeachWriter<Row>() {                @Override                public void process(Row value) {                    System.out.println(value);                }                @Override                public void close(Throwable errorOrNull) {                }                @Override                public boolean open(long partitionId, long version) {                    return true;                }            })            .start();    query.awaitTermination();架構(gòu)方法:private static StructType getSchema() {    return new StructType(new StructField[]{            new StructField(UNIX_TIME, DataTypes.TimestampType, false, Metadata.empty()),            new StructField(CATEGORY_ID, DataTypes.IntegerType, false, Metadata.empty()),            new StructField(IP, DataTypes.StringType, false, Metadata.empty()),            new StructField(TYPE, DataTypes.StringType, false, Metadata.empty()),    });}如何克服這個問題?對此有何建議?
查看完整描述

1 回答

?
慕虎7371278

TA貢獻(xiàn)1802條經(jīng)驗 獲得超4個贊

異常的這一部分準(zhǔn)確地告訴您在哪里尋找答案:

無法解析給定輸入列的“data.category_id”:[jsontostruct(value)]

換句話說,data.category_id可用列中沒有一列只是 1 列jsontostruct(value)。

這意味著僅select在流式查詢中不起作用。原因相當(dāng)簡單(我可以將其視為拼寫錯誤)——在Column和Datasetas("data")類型上可用的右括號太多。

總之,替換查詢的以下部分:

.select(from_json(new?Column("value"),?getSchema())).as("data")

至以下內(nèi)容:

.select(from_json(new?Column("value"),?getSchema()).as("data"))

請注意,我將一個右括號移到了末尾。


查看完整回答
反對 回復(fù) 2023-07-19
  • 1 回答
  • 0 關(guān)注
  • 130 瀏覽

添加回答

舉報

0/150
提交
取消
微信客服

購課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動學(xué)習(xí)伙伴

公眾號

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號