當(dāng)我使用新的組 ID 注冊(cè)消費(fèi)者時(shí),前 N 次輪詢(xún)調(diào)用不返回任何內(nèi)容。我想測(cè)試當(dāng)我調(diào)用服務(wù)時(shí),會(huì)發(fā)布一個(gè) Kafka 事件。問(wèn)題是每當(dāng)我更改 groupId 時(shí),前 N 個(gè)民意調(diào)查都不會(huì)返回任何內(nèi)容。我了解 Kafka 在輪詢(xún)時(shí)首先注冊(cè)消費(fèi)者,但我發(fā)現(xiàn)注冊(cè)消費(fèi)者所需的輪詢(xún)次數(shù)(時(shí)間)過(guò)于隨機(jī)。消費(fèi)者配置:Properties props = new Properties();props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_URL);props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_URL);props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);// props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);KafkaConsumer<S, T> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList(TOPIC_NAME));腳步:在每次測(cè)試之前,我consumer.poll(Duration.ofSeconds(5))只是為了確保消費(fèi)者已注冊(cè)并設(shè)置了偏移量。我調(diào)用服務(wù)并斷言響應(yīng)。如果我使用 UI 檢查 Kafka,則會(huì)發(fā)布事件。我打電話(huà)consumer.poll(Duration.ofSeconds(5)),希望能收到一些記錄。這是失敗的一步。有沒(méi)有辦法確保第二次投票總是返回記錄?我試圖讓第一次投票持續(xù) 1 分鐘(我已經(jīng)認(rèn)為 5 秒對(duì)于等待每次測(cè)試來(lái)說(shuō)太長(zhǎng)了),它有時(shí)仍然有效,有時(shí)無(wú)效。謝謝。
1 回答

一只萌萌小番薯
TA貢獻(xiàn)1795條經(jīng)驗(yàn) 獲得超7個(gè)贊
它不適用于您的“新 groupId”的原因是您處于“最新”模式。
默認(rèn)值為“最新”,您需要處于“最早”模式或使用您的“新 groupId”首次輪詢(xún)或?yàn)榇酥黝}的此“新 groupId”提交偏移量。
您需要將“groupId”注冊(cè)到主題,而不是消費(fèi)者。
添加回答
舉報(bào)
0/150
提交
取消