2 回答

TA貢獻(xiàn)1828條經(jīng)驗(yàn) 獲得超3個(gè)贊
您想要實(shí)現(xiàn) org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
當(dāng)您這樣做時(shí),您將實(shí)現(xiàn)兩種方法:
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// called when it's time to save state
myState.clear();
// Update myState with current application state
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// called when things start up, possibly recovering from an error
descriptor = new MapStateDescriptor<>("state", Types.STRING, Types.POJO(BroadcastedStateType.class));
myState = context.getKeyedStateStore().getMapState(descriptor);
if (context.isRestored()) {
// restore application state from myState
}
}
您可以在initializeState() 方法而不是open() 中初始化myState 變量。

TA貢獻(xiàn)1900條經(jīng)驗(yàn) 獲得超5個(gè)贊
我不相信你實(shí)際上可以在initializeState()中初始化廣播狀態(tài)。修改廣播狀態(tài)的唯一方法是通過(guò)在 processBroadcastElement 方法中獲得的讀/寫上下文。
但是你可以做的是在initializeState中使用context.isRestored()來(lái)確定KeyedBroadcastProcessFunction是否是第一次初始化,并設(shè)置一個(gè)瞬態(tài)局部變量來(lái)記錄此信息。然后,第一次調(diào)用 processBroadcastElement 方法時(shí),您可以使用此信息來(lái)決定在廣播狀態(tài)中存儲(chǔ)什么。但您必須在廣播流上發(fā)送一些內(nèi)容才能啟動(dòng)此操作。
添加回答
舉報(bào)