我有一個(gè)在 Flink 中維護(hù)配置的用例,但我真的不知道如何處理。假設(shè)我在某處存儲(chǔ)了一些配置,并且我需要它來進(jìn)行處理。在Flink作業(yè)初始化時(shí),我想加載所有配置。這個(gè)配置也可以在Flink作業(yè)運(yùn)行期間修改,所以我必須在內(nèi)存中保存這個(gè)配置的狀態(tài),并在需要時(shí)更新它。配置的更新可以從 KafkaSource 訪問。這就是我所擁有的:我有一個(gè)函數(shù)可以加載整個(gè)配置,將其保持在某種狀態(tài)并將其與我的數(shù)據(jù)流關(guān)聯(lián):public class MyConfiguration extends RichFlatMapFunction<Row, Row>{ private transient MapState<String, MyConfObject> configuration; @Override public void open(MyConfiguration config) throws Exception{ MapStateDescriptor<String,MyConfObject> descriptor = new MapStateDescriptor<String,MyConfObject>( "configuration", BasicTypeInfo.STRING_TYPE_INFO, ... ); configuration = getRuntimeContext().getMapState(descriptor); configuration.putAll(...); // Load configuration from somewhere } @Override public void flatMap(Row value, Collector<Row> out) throws Exception { MyConfObject conf = configuration.get(...); ... // Associate conf with data out.collect(value); }}我的管道看起來像這樣:DataStream<Row> dataStream = ...; // My data streamDataStream<Map<String, MyConfObject> streamConf = env.addSource(new FlinkKafkaConsumer<Row>(..., ..., ...)) // The stream of configuration updates .map(...); return dataStream .assignTimestampsAndWatermarks(...) .flatMap(new MyConfiguration()) ... //Do some processing .map(m -> { ObjectMapper objectMapper = new ObjectMapper(); String json = objectMapper.writeValueAsString(m); return json.getBytes(); });我想要的是使用配置更新流streamConf來更新平面地圖函數(shù)內(nèi)的 State 變量MyConfiguration。我怎樣才能做到這一點(diǎn) ?
1 回答

江戶川亂折騰
TA貢獻(xiàn)1851條經(jīng)驗(yàn) 獲得超5個(gè)贊
我建議您編寫一個(gè)源代碼,從 Kafka 讀取配置信息,然后通過廣播流將配置更改廣播到映射函數(shù)。映射函數(shù)將以持久狀態(tài)存儲(chǔ)完整的當(dāng)前配置,而廣播流意味著映射函數(shù)的所有實(shí)例都將獲得所有配置更改。
添加回答
舉報(bào)
0/150
提交
取消