我正在嘗試運(yùn)行 2 個(gè)訂閱了 2 個(gè)不同主題的消費(fèi)者。兩個(gè)消費(fèi)者程序每次運(yùn)行一個(gè)時(shí)都運(yùn)行正常,但同時(shí)運(yùn)行時(shí),其中一個(gè)消費(fèi)者總是顯示異常:org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.我遵循了建議max.pool.size,將 2設(shè)置為session.timeout.ms30000,1000heartbeat.interval.ms下面是我的消費(fèi)者函數(shù),這兩個(gè)文件的函數(shù)是相同的,只是主題名稱更改為Test2,并且我在同時(shí)運(yùn)行的 2 個(gè)不同類中運(yùn)行這兩個(gè)函數(shù)。 public void consume() { //Kafka consumer configuration settings List<String> topicNames = new ArrayList<String>(); topicNames.add("Test1"); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "false"); props.put("session.timeout.ms", "30000"); props.put("heartbeat.interval.ms", "1000"); props.put("max.poll.records", "2"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(topicNames);由于此錯(cuò)誤,記錄不會(huì)在Kafka主題中提交。我該如何克服這個(gè)錯(cuò)誤?
1 回答

倚天杖
TA貢獻(xiàn)1828條經(jīng)驗(yàn) 獲得超3個(gè)贊
在您的情況下,您需要為消費(fèi)者分配不同的組 ID。您正在使用相同的組 ID 創(chuàng)建兩個(gè)消費(fèi)者(這是可以的),但是調(diào)用 subscribe 兩次是不行的。
您可以一次運(yùn)行一個(gè)消費(fèi)者,因?yàn)槟徽{(diào)用 subscribe 一次。
如果您需要任何進(jìn)一步的幫助,請(qǐng)告訴我。很高興能幫助你。
添加回答
舉報(bào)
0/150
提交
取消