第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號(hào)安全,請(qǐng)及時(shí)綁定郵箱和手機(jī)立即綁定
已解決430363個(gè)問(wèn)題,去搜搜看,總會(huì)有你想問(wèn)的

如何根據(jù)json中的特定鍵將接收器從一個(gè)數(shù)據(jù)流添加到不同的路徑?

如何根據(jù)json中的特定鍵將接收器從一個(gè)數(shù)據(jù)流添加到不同的路徑?

哆啦的時(shí)光機(jī) 2022-10-20 17:24:25
我有類似的json,{  "name":"someone",  "job":"doctor",  "etc":"etc"}在每個(gè) json 中,“工作”都有不同的值,比如醫(yī)生、飛行員、司機(jī)、守望者等。我想根據(jù)“工作”值分離每個(gè) json,并將其存儲(chǔ)在不同的位置,如,/home/doctor等。/home/pilot/home/driver我已經(jīng)嘗試過(guò) SplitStream 函數(shù)來(lái)執(zhí)行此操作,但我必須指定這些值以匹配條件。public class MyFlinkJob {       private static JsonParser jsonParser = new JsonParser();    private static String key_1 = "doctor";    private static String key_2 = "driver";    private static String key_3 = "pilot";    private static String key_default = "default";    public static void main(String args[]) throws Exception {        Properties prop = new Properties();        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        Properties props = new Properties();        props.setProperty("bootstrap.servers", kafka);        props.setProperty("group.id", "myjob");        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), props);        DataStream<String> record = env.addSource(myConsumer).rebalance()        SplitStream<String> split = record.split(new OutputSelector<String>() {            @Override            public Iterable<String> select(String val) {                JsonObject json = (JsonObject)jsonParser.parse(val);                String jsonValue = CommonFields.getFieldValue(json, "job");                List<String> output = new ArrayList<String>();                if (key_1.equalsIgnoreCase(jsonValue)) {            }                    output.add("doctor");                } else if (key_2.equalsIgnoreCase(jsonValue)) {                    output.add("driver");                } else if (key_3.equalsIgnoreCase(jsonValue)) {                    output.add("pilot");                } else {                    output.add("default");                }                return output;            }});}假設(shè)如果任何其他值出現(xiàn)在“job”中,比如工程師或其他東西,并且我沒(méi)有在類中指定,那么它會(huì)轉(zhuǎn)到默認(rèn)文件夾有沒(méi)有辦法根據(jù)“job”的值自動(dòng)拆分這些 json 事件而不指定它和創(chuàng)建一個(gè)包含值名稱的路徑,例如 /home/enginerr?
查看完整描述

1 回答

?
守候你守候我

TA貢獻(xiàn)1802條經(jīng)驗(yàn) 獲得超10個(gè)贊

您想使用BucketingSink,它支持根據(jù)字段的值將記錄寫入單獨(dú)的存儲(chǔ)桶。我可能有一個(gè) map 函數(shù),它接收 JSON 字符串,對(duì)其進(jìn)行解析并發(fā)出 a Tuple2<String, String>,其中第一個(gè)元素是jobJSON 中字段的值,第二個(gè)元素是完整的 JSON 字符串。



查看完整回答
反對(duì) 回復(fù) 2022-10-20
  • 1 回答
  • 0 關(guān)注
  • 105 瀏覽
慕課專欄
更多

添加回答

舉報(bào)

0/150
提交
取消
微信客服

購(gòu)課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動(dòng)學(xué)習(xí)伙伴

公眾號(hào)

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號(hào)