所以我試圖將 Kafka 用于我的應(yīng)用程序,它有一個(gè)生產(chǎn)者將操作記錄到 Kafka MQ 中,而消費(fèi)者從 MQ 中讀取它。由于我的應(yīng)用程序在 Go 中,我使用 Shopify Sarama 使這成為可能?,F(xiàn)在,我可以讀取 MQ 并使用fmt.Printf然而,我真的希望錯(cuò)誤處理比控制臺(tái)打印更好,我愿意加倍努力?,F(xiàn)在用于消費(fèi)者連接的代碼:mqCfg := sarama.NewConfig()master, err := sarama.NewConsumer([]string{brokerConnect}, mqCfg)if err != nil { panic(err) // Don't want to panic when error occurs, instead handle it}以及消息的處理: go func() { defer wg.Done() for message := range consumer.Messages() { var msgContent Message _ = json.Unmarshal(message.Value, &msgContent) fmt.Printf("Reading message of type %s with id : %d\n", msgContent.Type, msgContent.ContentId) //Don't want to print it }}()我的問(wèn)題(我是測(cè)試 Kafka 的新手,一般來(lái)說(shuō)也是 kafka 的新手):上面程序中哪里會(huì)出現(xiàn)錯(cuò)誤,以便我可以處理它們?任何示例代碼對(duì)我來(lái)說(shuō)都是很好的開(kāi)始。我能想到的錯(cuò)誤條件是 msgContent 在 JSON 中并不真正包含任何類型的 ContentId 字段。在 kafka 中,是否存在消費(fèi)者嘗試以當(dāng)前偏移量讀取但由于某種原因無(wú)法讀取的情況(即使 JSON 格式正確)?我的消費(fèi)者是否有可能回溯在失敗的偏移讀取上方說(shuō) x 步并重新處理偏移?或者有沒(méi)有更好的方法來(lái)做到這一點(diǎn)?再次,這些情況會(huì)是什么?我愿意閱讀和嘗試。
- 1 回答
- 0 關(guān)注
- 374 瀏覽
添加回答
舉報(bào)
0/150
提交
取消