第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號安全,請及時綁定郵箱和手機(jī)立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

當(dāng)我使用 akka 流在現(xiàn)有消費(fèi)者組中創(chuàng)建新消費(fèi)者時,如何尋求結(jié)束 kafka 主題?

當(dāng)我使用 akka 流在現(xiàn)有消費(fèi)者組中創(chuàng)建新消費(fèi)者時,如何尋求結(jié)束 kafka 主題?

浮云間 2023-02-16 16:10:18
我有一個 akka 應(yīng)用程序(在 JAVA 中),用于commitablePartitionedSource使用來自 kafka 主題的消息。我有幾個消費(fèi)者群體,可以為多個主題吸引消費(fèi)者。這是由動態(tài)配置驅(qū)動的,我可以在其中暫時關(guān)閉消費(fèi)者,并可能在稍后重新啟動它們。當(dāng)這個消費(fèi)者重新啟動時,我只想閱讀新消息,而不是從我離開的地方開始。有沒有辦法從 akka-alpakka 消費(fèi)者那里獲取 kafkaConsumer 對象,以便我可以在處理之前使用 seekToEnd()?請讓我知道是否還有其他方法可以實現(xiàn)這一目標(biāo)?也許使用 akka 配置或不同類型的消費(fèi)者?我不想維護(hù)自己的偏移量(希望不是唯一的選擇)我的配置設(shè)置為latest在我啟動消費(fèi)者組時獲取偏移量,但由于我正在關(guān)閉并重新啟動單個消費(fèi)者,它總是從我停止的地方開始消費(fèi)。我嘗試為一個主題創(chuàng)建一個消費(fèi)者組,但我有很多主題,而且結(jié)果非常耗費(fèi)資源。我還尋找一種方法來清除存儲在 kafka 中的該主題的偏移量,但沒有成功。
查看完整描述

2 回答

?
叮當(dāng)貓咪

TA貢獻(xiàn)1776條經(jīng)驗 獲得超12個贊

最簡單的方法就是在每次啟動重新啟動消費(fèi)者時創(chuàng)建一個新的消費(fèi)者組。Kafka 將在可配置的時間量 ( retention.ms ) 后負(fù)責(zé)刪除陳舊的消費(fèi)者組。

如果您很少重啟消費(fèi)者并且總是希望它處理新數(shù)據(jù)而不是趕上所有丟失的消息,則此策略很好。

編輯

據(jù)我所知,訪問底層 KafkaConsumer 的唯一方法是使用committableExternalSource. 這樣您就可以訪問該seekToEnd方法,但是您還需要注意訂閱提供每個分區(qū)的起始偏移量的主題(類似于您現(xiàn)在在AkkacommittablePartitionedSource之外設(shè)置的方式)。


查看完整回答
反對 回復(fù) 2023-02-16
?
明月笑刀無情

TA貢獻(xiàn)1828條經(jīng)驗 獲得超4個贊

commitablePartitionedSource將AutoSubscription其作為輸入,您不能指定偏移量。


您需要的是一種采用ManualSubscription或更高級別的方法Subscription,例如


> plainExternalSource

> committableExternalSource

> plainSource

...


查看完整回答
反對 回復(fù) 2023-02-16
  • 2 回答
  • 0 關(guān)注
  • 145 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動學(xué)習(xí)伙伴

公眾號

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號