慕運維8079593
2023-11-10 15:22:50
我想了解我將消費者配置為不自動提交的 kafkaConsumer.poll() 方法的行為 Properties properties = new Properties();
properties.setProperty("bootstrap.servers", bootstrapAddress);
KafkaConsumer consumer = new KafkaConsumer(properties);據(jù)我了解,根據(jù) Javadoc,如果我這樣做 ConsumerRecords firstBatch = consumer.poll(0l);
ConsumerRecords secondBatch = consumer.poll(0l);假設(shè)主題中只有一個分區(qū),因為尚未提交偏移量,則firstBatch和都應(yīng)secondBatch包含相同的。ConsumerRecords我的假設(shè)正確嗎?我的問題是,每次我調(diào)用consumer.poll(0l)下一批時ConsumerRecords都會獲取
1 回答

慕慕森
TA貢獻(xiàn)1856條經(jīng)驗 獲得超17個贊
firstBatch 和 secondaryBatch 都應(yīng)包含相同的 ConsumerRecords
offset
這是錯誤的,即使禁用自動或offset
手動提交,Kafka 消費者偏移量也會在每次后續(xù)輪詢中自動增加
抵消和消費者地位
消費者的位置給出了將給出的下一條記錄的偏移量。它將比消費者在該分區(qū)中看到的最高偏移量大 1。每次消費者在調(diào)用 poll(long) 中收到消息時,它都會自動前進
提交的位置是已安全存儲的最后一個偏移量。如果進程失敗并重新啟動,這就是消費者將恢復(fù)到的偏移量。消費者可以定期自動提交偏移量;或者它可以選擇通過調(diào)用提交 API 之一(例如 commitSync 和 commitAsync)來手動控制此提交位置。
你的假設(shè)以另一種方式是正確的,當(dāng)offset
未提交并且kafka消費者重新啟動時,它將輪詢舊批次或從提交舊偏移量的開頭開始。
添加回答
舉報
0/150
提交
取消