2 回答

TA貢獻(xiàn)1776條經(jīng)驗 獲得超12個贊
最簡單的方法就是在每次啟動重新啟動消費(fèi)者時創(chuàng)建一個新的消費(fèi)者組。Kafka 將在可配置的時間量 ( retention.ms ) 后負(fù)責(zé)刪除陳舊的消費(fèi)者組。
如果您很少重啟消費(fèi)者并且總是希望它處理新數(shù)據(jù)而不是趕上所有丟失的消息,則此策略很好。
編輯
據(jù)我所知,訪問底層 KafkaConsumer 的唯一方法是使用committableExternalSource
. 這樣您就可以訪問該seekToEnd
方法,但是您還需要注意訂閱提供每個分區(qū)的起始偏移量的主題(類似于您現(xiàn)在在AkkacommittablePartitionedSource
之外設(shè)置的方式)。

TA貢獻(xiàn)1828條經(jīng)驗 獲得超4個贊
commitablePartitionedSource將AutoSubscription其作為輸入,您不能指定偏移量。
您需要的是一種采用ManualSubscription或更高級別的方法Subscription,例如
> plainExternalSource
> committableExternalSource
> plainSource
...
添加回答
舉報