我有類似的json,{ "name":"someone", "job":"doctor", "etc":"etc"}在每個(gè) json 中,“工作”都有不同的值,比如醫(yī)生、飛行員、司機(jī)、守望者等。我想根據(jù)“工作”值分離每個(gè) json,并將其存儲(chǔ)在不同的位置,如,/home/doctor等。/home/pilot/home/driver我已經(jīng)嘗試過(guò) SplitStream 函數(shù)來(lái)執(zhí)行此操作,但我必須指定這些值以匹配條件。public class MyFlinkJob { private static JsonParser jsonParser = new JsonParser(); private static String key_1 = "doctor"; private static String key_2 = "driver"; private static String key_3 = "pilot"; private static String key_default = "default"; public static void main(String args[]) throws Exception { Properties prop = new Properties(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties props = new Properties(); props.setProperty("bootstrap.servers", kafka); props.setProperty("group.id", "myjob"); FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), props); DataStream<String> record = env.addSource(myConsumer).rebalance() SplitStream<String> split = record.split(new OutputSelector<String>() { @Override public Iterable<String> select(String val) { JsonObject json = (JsonObject)jsonParser.parse(val); String jsonValue = CommonFields.getFieldValue(json, "job"); List<String> output = new ArrayList<String>(); if (key_1.equalsIgnoreCase(jsonValue)) { } output.add("doctor"); } else if (key_2.equalsIgnoreCase(jsonValue)) { output.add("driver"); } else if (key_3.equalsIgnoreCase(jsonValue)) { output.add("pilot"); } else { output.add("default"); } return output; }});}假設(shè)如果任何其他值出現(xiàn)在“job”中,比如工程師或其他東西,并且我沒(méi)有在類中指定,那么它會(huì)轉(zhuǎn)到默認(rèn)文件夾有沒(méi)有辦法根據(jù)“job”的值自動(dòng)拆分這些 json 事件而不指定它和創(chuàng)建一個(gè)包含值名稱的路徑,例如 /home/enginerr?
如何根據(jù)json中的特定鍵將接收器從一個(gè)數(shù)據(jù)流添加到不同的路徑?
哆啦的時(shí)光機(jī)
2022-10-20 17:24:25