我有類似的json,{ "name":"someone", "job":"doctor", "etc":"etc"}在每個 json 中,“工作”都有不同的值,比如醫(yī)生、飛行員、司機、守望者等。我想根據“工作”值分離每個 json,并將其存儲在不同的位置,如,/home/doctor等。/home/pilot/home/driver我已經嘗試過 SplitStream 函數來執(zhí)行此操作,但我必須指定這些值以匹配條件。public class MyFlinkJob { private static JsonParser jsonParser = new JsonParser(); private static String key_1 = "doctor"; private static String key_2 = "driver"; private static String key_3 = "pilot"; private static String key_default = "default"; public static void main(String args[]) throws Exception { Properties prop = new Properties(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties props = new Properties(); props.setProperty("bootstrap.servers", kafka); props.setProperty("group.id", "myjob"); FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), props); DataStream<String> record = env.addSource(myConsumer).rebalance() SplitStream<String> split = record.split(new OutputSelector<String>() { @Override public Iterable<String> select(String val) { JsonObject json = (JsonObject)jsonParser.parse(val); String jsonValue = CommonFields.getFieldValue(json, "job"); List<String> output = new ArrayList<String>(); if (key_1.equalsIgnoreCase(jsonValue)) { } output.add("doctor"); } else if (key_2.equalsIgnoreCase(jsonValue)) { output.add("driver"); } else if (key_3.equalsIgnoreCase(jsonValue)) { output.add("pilot"); } else { output.add("default"); } return output; }});}假設如果任何其他值出現在“job”中,比如工程師或其他東西,并且我沒有在類中指定,那么它會轉到默認文件夾有沒有辦法根據“job”的值自動拆分這些 json 事件而不指定它和創(chuàng)建一個包含值名稱的路徑,例如 /home/enginerr?
1 回答
守候你守候我
TA貢獻1802條經驗 獲得超10個贊
您想使用BucketingSink,它支持根據字段的值將記錄寫入單獨的存儲桶。我可能有一個 map 函數,它接收 JSON 字符串,對其進行解析并發(fā)出 a Tuple2<String, String>,其中第一個元素是jobJSON 中字段的值,第二個元素是完整的 JSON 字符串。
添加回答
舉報
0/150
提交
取消
