只是為了澄清,我是卡夫卡的新手,很抱歉,如果我的問題似乎沒有記錄,我正在閱讀教程、文檔和我能理解的一切。我試圖從 GlobalStore 讀取所有值以更新其值,然后使用已存在的 StateStore 來放置這些新的更新值。我正在嘗試這樣做,因為當我這樣做時:this.stateStore.all();我只有1/10的數(shù)據(jù),如果我理解正確的話,這是因為我有10個分區(qū),而ss,只讀取一個(雖然我不太明白為什么)這是我的全局表: public StreamsBuilder declareTopology(StreamsBuilder builder) { logger.debug("Building topology : input topic ~ {} ; output topics ~ {}, {}", getInputTopic(), getDataTopic(), getToEsTopic()); builder.globalTable( getDataTopic(), Consumed.with(Serdes.String(), fooSerdes) .withOffsetResetPolicy(Topology.AutoOffsetReset.EARLIEST), Materialized.<String, Foo, KeyValueStore<Bytes, byte[]>>as( "foosktable") .withKeySerde(Serdes.String()) .withValueSerde(fooSerdes) .withLoggingEnabled(new HashMap<>())); ...這是 addStateStore,我無法刪除它,因為它在代碼的其他地方使用: ... builder.addStateStore( Stores.keyValueStoreBuilder( Stores.persistentKeyValueStore("foosktable"), Serdes.String(), fooSerdes)); ... return builder;}因此,從理論上講,我想做的是刪除也使用相同主題的 StateStore,并使用我的 data.process 主題之一放置我的數(shù)據(jù),問題是該處理器使用此 StateStore 執(zhí)行其他操作,所以我不能用核武器攻擊它。我在這里迷路了,任何光都會有很大幫助。謝謝 !
1 回答

胡說叔叔
TA貢獻1804條經(jīng)驗 獲得超8個贊
有點不清楚你真正想要實現(xiàn)的目標是什么。然而,一些高級解釋:
AGlobalKTable
只有一個目的:從主題讀取數(shù)據(jù)而不進行修改,以允許執(zhí)行連接KStream-GlobalKTable
或通過“交互式查詢”查詢存儲。
因此,您無法真正執(zhí)行您想要的操作,因為無法按照您的意圖將數(shù)據(jù)從全局存儲復制到另一個存儲。您需要復制輸入主題并讀取兩次:(1) 和GlobalKTable
(2) 常規(guī),KStream
以在將數(shù)據(jù)放入商店之前修改數(shù)據(jù)。對于 (2),您可以使用transform()
.
希望這可以幫助。
添加回答
舉報
0/150
提交
取消