2 回答

TA貢獻(xiàn)1893條經(jīng)驗 獲得超10個贊
例如,通過新的通道-3將戈魯廷-3的反饋添加到戈魯廷-1。戈魯廷-1將阻塞,直到它得到頻道-3的確認(rèn)。
// in gorouting 1
channel1 <- data
select {
case <-channel3:
case <-ctx.Done(): // or smth else to prevent deadlock
}
...
// in gorouting 3
data := <-channel2
for {
if err := sendData(data); err == nil {
break
}
}
channel3<-struct{}{}

TA貢獻(xiàn)1934條經(jīng)驗 獲得超2個贊
為了確保正確性,您需要在處理成功完成后提交(=確認(rèn))消息。
對于處理未成功完成的情況 - 通常,您需要自己實現(xiàn)重試機(jī)制。
這應(yīng)該特定于您的用例,但通常您將消息拋回專用的Kafka重試主題(您創(chuàng)建),添加睡眠并再次處理消息。如果在 x 次后處理失敗 - 則將消息拋出到 DLQ(=死信隊列)。
你可以在這里閱讀更多:
https://eng.uber.com/reliable-reprocessing/
https://www.confluent.io/blog/error-handling-patterns-in-kafka/
- 2 回答
- 0 關(guān)注
- 102 瀏覽
添加回答
舉報