我使用此配置創(chuàng)建一個新消費者:c, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": addresses, "group.id": "my_group", "auto.offset.reset": "earliest", })topic := "testTopic"if err = c.SubscribeTopics([]string{topic}, nil); err != nil { panic(err)}然后我根據(jù)以下代碼生成事件并使用一個事件:events := []map[string]string{{ "name": "Foo",}, { "name": "Bar", }, } err = p.ProduceEvent(events[0])//there is a wrapper to produce events err = p.ProduceEvent(events[1]) res, err := c.ReadMessage(100 * time.Second) time.Sleep(20 * time.Second) c.Close() 當(dāng)我用 描述該組時 watch /home/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my_group --describe。每一步的結(jié)果是:產(chǎn)生事件后: 當(dāng)我消費一個事件時: 關(guān)閉消費者后: 我不明白為什么最后滯后為零!我只消耗了一個事件。這對我來說很奇怪,那Close會改變偏移量。任何線索表示贊賞。
1 回答

拉風(fēng)的咖菲貓
TA貢獻1995條經(jīng)驗 獲得超2個贊
ReadMessage
包裹Poll
。Poll
獲取一批消息并在本地緩沖它們。由于您已將消費者配置為自動提交偏移量,因此它將提交所有獲取的消息,甚至是那些在本地緩存且您的應(yīng)用程序仍未處理的消息。這就是為什么您看到關(guān)閉消費者后沒有延遲。
librdkafka
(因此confluent-kafka-go
)沒有辦法配置max.pool.records
,所以如果你想準(zhǔn)確控制哪些偏移量被提交,你需要禁用自動提交偏移量并使用手動提交它們StoreOffsets
:https ://github.com/confluentinc/confluent- kafka-go/issues/380#issuecomment-539903016
- 1 回答
- 0 關(guān)注
- 337 瀏覽
添加回答
舉報
0/150
提交
取消