第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何根據json中的特定鍵將接收器從一個數據流添加到不同的路徑?

如何根據json中的特定鍵將接收器從一個數據流添加到不同的路徑?

哆啦的時光機 2022-10-20 17:24:25
我有類似的json,{  "name":"someone",  "job":"doctor",  "etc":"etc"}在每個 json 中,“工作”都有不同的值,比如醫(yī)生、飛行員、司機、守望者等。我想根據“工作”值分離每個 json,并將其存儲在不同的位置,如,/home/doctor等。/home/pilot/home/driver我已經嘗試過 SplitStream 函數來執(zhí)行此操作,但我必須指定這些值以匹配條件。public class MyFlinkJob {       private static JsonParser jsonParser = new JsonParser();    private static String key_1 = "doctor";    private static String key_2 = "driver";    private static String key_3 = "pilot";    private static String key_default = "default";    public static void main(String args[]) throws Exception {        Properties prop = new Properties();        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        Properties props = new Properties();        props.setProperty("bootstrap.servers", kafka);        props.setProperty("group.id", "myjob");        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), props);        DataStream<String> record = env.addSource(myConsumer).rebalance()        SplitStream<String> split = record.split(new OutputSelector<String>() {            @Override            public Iterable<String> select(String val) {                JsonObject json = (JsonObject)jsonParser.parse(val);                String jsonValue = CommonFields.getFieldValue(json, "job");                List<String> output = new ArrayList<String>();                if (key_1.equalsIgnoreCase(jsonValue)) {            }                    output.add("doctor");                } else if (key_2.equalsIgnoreCase(jsonValue)) {                    output.add("driver");                } else if (key_3.equalsIgnoreCase(jsonValue)) {                    output.add("pilot");                } else {                    output.add("default");                }                return output;            }});}假設如果任何其他值出現在“job”中,比如工程師或其他東西,并且我沒有在類中指定,那么它會轉到默認文件夾有沒有辦法根據“job”的值自動拆分這些 json 事件而不指定它和創(chuàng)建一個包含值名稱的路徑,例如 /home/enginerr?
查看完整描述

1 回答

?
守候你守候我

TA貢獻1802條經驗 獲得超10個贊

您想使用BucketingSink,它支持根據字段的值將記錄寫入單獨的存儲桶。我可能有一個 map 函數,它接收 JSON 字符串,對其進行解析并發(fā)出 a Tuple2<String, String>,其中第一個元素是jobJSON 中字段的值,第二個元素是完整的 JSON 字符串。



查看完整回答
反對 回復 2022-10-20
  • 1 回答
  • 0 關注
  • 122 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號