第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號(hào)安全,請(qǐng)及時(shí)綁定郵箱和手機(jī)立即綁定
已解決430363個(gè)問(wèn)題,去搜搜看,總會(huì)有你想問(wèn)的

初始化MapState的內(nèi)容

初始化MapState的內(nèi)容

qq_遁去的一_1 2023-10-12 14:39:29
我實(shí)現(xiàn)了一個(gè)RichFunction具有以下結(jié)構(gòu)的 Flink:public class MyFunction extends KeyedBroadcastProcessFunction <String, InputType, BroadcastedStateType, OutputType> {    private MapState<String, MyState> myState;                  @Override    public void open(Configuration conf)throws Exception{        myState = getRuntimeContext().getMapState(new MapStateDescriptor<>("state", Types.STRING, Types.POJO(BroadcastedStateType.class)));    }    @Override    public void processElement(InputType value, ReadOnlyContext ctx, Collector<OutputType> out) throws Exception {        MyState state = myState.get(value.ID());        // Do things    }    @Override    public void processBroadcastElement(BroadcastedStateType value, Context ctx, Collector<OutputType> out) throws Exception {        state.put(value.ID(), value.state());   // Update the mapState with value from broadcast    }    // retrieve all the state values and put them in the MapState    private void initialState() throws Exception{       Map<String, MyState> initialValues = ...;       this.cameras.putAll(initialValues);    }}該mapState變量存儲(chǔ)通過(guò)BroadcastedStream. 更新是在processBroadcastElement()函數(shù)中完成的。在作業(yè)開(kāi)始時(shí),我想mapState使用該initialState()函數(shù)來(lái)初始化。問(wèn)題是我無(wú)法在函數(shù)中使用它open()(請(qǐng)參閱此處原因)在這種情況下初始化的正確方法是什么mapState?(在所有使用 RichFunctions 的情況下)
查看完整描述

2 回答

?
子衿沉夜

TA貢獻(xiàn)1828條經(jīng)驗(yàn) 獲得超3個(gè)贊

您想要實(shí)現(xiàn) org.apache.flink.streaming.api.checkpoint.CheckpointedFunction


當(dāng)您這樣做時(shí),您將實(shí)現(xiàn)兩種方法:


@Override

public void snapshotState(FunctionSnapshotContext context) throws Exception {


    // called when it's time to save state


    myState.clear();


        // Update myState with current application state 


}


@Override

public void initializeState(FunctionInitializationContext context) throws Exception {


    // called when things start up, possibly recovering from an error


    descriptor = new MapStateDescriptor<>("state", Types.STRING, Types.POJO(BroadcastedStateType.class));


    myState = context.getKeyedStateStore().getMapState(descriptor);


    if (context.isRestored()) {


        // restore application state from myState  


    }       


}

您可以在initializeState() 方法而不是open() 中初始化myState 變量。


查看完整回答
反對(duì) 回復(fù) 2023-10-12
?
梵蒂岡之花

TA貢獻(xiàn)1900條經(jīng)驗(yàn) 獲得超5個(gè)贊

我不相信你實(shí)際上可以在initializeState()中初始化廣播狀態(tài)。修改廣播狀態(tài)的唯一方法是通過(guò)在 processBroadcastElement 方法中獲得的讀/寫上下文。

但是你可以做的是在initializeState中使用context.isRestored()來(lái)確定KeyedBroadcastProcessFunction是否是第一次初始化,并設(shè)置一個(gè)瞬態(tài)局部變量來(lái)記錄此信息。然后,第一次調(diào)用 processBroadcastElement 方法時(shí),您可以使用此信息來(lái)決定在廣播狀態(tài)中存儲(chǔ)什么。但您必須在廣播流上發(fā)送一些內(nèi)容才能啟動(dòng)此操作。


查看完整回答
反對(duì) 回復(fù) 2023-10-12
  • 2 回答
  • 0 關(guān)注
  • 114 瀏覽
慕課專欄
更多

添加回答

舉報(bào)

0/150
提交
取消
微信客服

購(gòu)課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動(dòng)學(xué)習(xí)伙伴

公眾號(hào)

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號(hào)