第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號安全,請及時綁定郵箱和手機(jī)立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何使用 Java 從火花中的卡夫卡讀取流嵌套的 JSON

如何使用 Java 從火花中的卡夫卡讀取流嵌套的 JSON

湖上湖 2022-09-14 10:29:12
我正在嘗試使用Java從卡夫卡中讀取復(fù)雜的嵌套JSON數(shù)據(jù),并且在形成數(shù)據(jù)集時遇到麻煩發(fā)送到卡夫卡的實際 JSON 文件{"sample_title": {"txn_date": "2019-01-10","timestamp": "2019-02-01T08:57:18.100Z","txn_type": "TBD","txn_rcvd_time": "01/04/2019 03:32:32.135","txn_ref": "Test","txn_status": "TEST"}}{"sample_title2": {"txn_date": "2019-01-10","timestamp": "2019-02-01T08:57:18.100Z","txn_type": "TBD","txn_rcvd_time": "01/04/2019 03:32:32.135","txn_ref": "Test","txn_status": "TEST"}}{"sample_title3": {"txn_date": "2019-01-10","timestamp": "2019-02-01T08:57:18.100Z","txn_type": "TBD","txn_rcvd_time": "01/04/2019 03:32:32.135","txn_ref": "Test","txn_status": "TEST"}}Dataset<Row> df = spark.readStream().format("kafka")                    .option("spark.local.dir", config.getString(PropertyKeys.SPARK_APPLICATION_TEMP_LOCATION.getCode()))                    .option("kafka.bootstrap.servers",                            config.getString(PropertyKeys.KAFKA_BOORTSTRAP_SERVERS.getCode()))                    .option("subscribe", config.getString(PropertyKeys.KAFKA_TOPIC_IPE_STP.getCode()))                    .option("startingOffsets", "earliest")                    .option("spark.default.parallelism",                            config.getInt(PropertyKeys.SPARK_APPLICATION_DEFAULT_PARALLELISM_VALUE.getCode()))                    .option("spark.sql.shuffle.partitions",                            config.getInt(PropertyKeys.SPARK_APPLICATION_SHUFFLE_PARTITIONS_COUNT.getCode()))                    .option("kafka.security.protocol", config.getString(PropertyKeys.SECURITY_PROTOCOL.getCode()))val output =  df.selectExpr("CAST(value AS STRING)").as(Encoders.STRING()).filter(x -> x.contains("sample_title"));由于我可以在輸入中有多個架構(gòu),因此代碼應(yīng)該能夠處理它并根據(jù)標(biāo)題進(jìn)行過濾并映射到Title類型的數(shù)據(jù)集
查看完整描述

1 回答

?
楊__羊羊

TA貢獻(xiàn)1943條經(jīng)驗 獲得超7個贊

首先使類標(biāo)題成為java bean類,即編寫獲取器和設(shè)置器。


    public class Title implements Serializable {

        String txn_date;

        Timestamp timestamp;

        String txn_type;

        String txn_rcvd_time;

        String txn_ref;

        String txn_status;

        public Title(String data){... //set values for fields with the data}

        // add all getters and setters for fields

    }


    Dataset<Title> resultdf = df.selectExpr("CAST(value AS STRING)").map(value -> new Title(value), Encoders.bean(Title.class))

resultdf.filter(title -> // apply any predicate on title)

如果要先篩選數(shù)據(jù),然后應(yīng)用編碼,


    df.selectExpr("CAST(value AS STRING)")

.filter(get_json_object(col("value"), "$.sample_title").isNotNull)

// for simple filter use, .filter(t-> t.contains("sample_title"))

.map(value -> new Title(value), Encoders.bean(Title.class))


查看完整回答
反對 回復(fù) 2022-09-14
  • 1 回答
  • 0 關(guān)注
  • 140 瀏覽

添加回答

舉報

0/150
提交
取消
微信客服

購課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動學(xué)習(xí)伙伴

公眾號

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號