我無法升級到 Spring 5,所以我堅持使用 spring-kafka 1.3 及其有限的錯誤處理。所以我無法訪問 spring-kafka 2 中的 ConsumerAwareErrorHandler 或 SeekToCurrent 錯誤處理程序。我正在使用@KafkaListener-annotated 方法來收聽一個主題,我已將其配置io.confluent.kafka.serializers.KafkaAvroDeserializer為我的值反序列化器。問題是,如果我在主題中得到一條不是 Avro 格式的消息,那么 KafkaMessageListenerContainer 輪詢循環(huán)就會卡住。反序列化程序會在消息上引發(fā)異常,并且輪詢循環(huán)永遠不會越過它,因此下次通過循環(huán)時,它會嘗試反序列化相同的消息并繼續(xù)循環(huán),每秒將相同的錯誤轉(zhuǎn)儲到我的日志中數(shù)千次。似乎沒有辦法獲得 NeverRetryPolicy 或邊緣方面的任何東西,但我可以factory.getContainerProperties().setErrorHandler()。不幸的是,我不確定我能從那里做什么。有什么東西可以自動連接到我的錯誤處理程序中,我可以用它來尋找錯誤時的前向 1 個偏移量嗎?不確定那是什么,文檔并沒有過多地談?wù)撃梢允褂?ErrorHandler 實際做什么,我能找到的大多數(shù)示例都是針對 spring-kafka 2.X 的。就像,它不會反序列化,我對消息無能為力,它永遠不會起作用,我想避免再次重試它,而且似乎大多數(shù) Stackoverflow 問題都是關(guān)于做相反的事情。我還看到有些人只是將 Avro 的 Deserializer 包裝在他們自己的類中,該類會吃掉異常并返回 null。這是一個更好的計劃嗎?
1 回答

幕布斯7119047
TA貢獻1794條經(jīng)驗 獲得超8個贊
問題是反序列化在 Spring 獲取數(shù)據(jù)之前就失敗了——問題出在 Kafka 本身。
在 2.2 中,我們添加了ErrorHandlingDeserializer2
包裝真正的反序列化器并向偵聽器容器發(fā)送信號,以便可以將錯誤發(fā)送到錯誤處理程序。
在舊版本中,您需要編寫自己的反序列化器包裝器 - 但是,容器中沒有代碼來處理這種情況,因此您的 catch 塊需要返回一個真實的對象,該對象是向您的偵聽器發(fā)出反序列化失敗的信號.
假設(shè)您的聽眾收到Invoice
. 您的 catch 塊可以創(chuàng)建一個子類,BadInvoice
然后說您可以在偵聽器中檢測并丟棄它。
我還看到有些人只是將 Avro 的 Deserializer 包裝在他們自己的類中,該類會吃掉異常并返回 null。這是一個更好的計劃嗎?
如果您從未獲得真正的空記錄,您可以返回null
,但您必須添加@Payload(required = false)
到方法參數(shù)。
添加回答
舉報
0/150
提交
取消