我們在我們的項目中使用 Spring Kafka 2.1.4.RELEASE 版本,我們有以下配置:@EnableKafkapublic class KafkaConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Configuration class ProducerConfig { @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ASerializer.class); return props; } @Bean public ProducerFactory<String, A> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<String, A> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } } @Configuration class ConsumerConfig { @Value("${spring.kafka.consumer.group-id}") private String groupId; @Value("${spring.kafka.consumer.auto-offset-reset}") private String autoOffsetReset; @Value("${spring.kafka.consumer.enable-auto-commit}") private boolean enableAutoCommit; @Value("${spring.kafka.consumer.max-poll-records}") private Integer maxPollRecords; @Bean public Map<String, Object> firstConsumerConfig() { Map<String, Object> props = getCommonConsumerConfig(); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ADeserializer.class); return props; }所以我們在啟動這個應(yīng)用程序時注意到它并不是一直連接到這兩個主題。有時它僅連接到第二個主題或僅連接到第一個主題,并且可能連接到第一個和第二個主題(這是正確的)。那么你能幫助理解這里配置錯誤嗎?
2 回答

繁花如伊
TA貢獻2012條經(jīng)驗 獲得超12個贊
通常最佳做法是將每個偵聽器放在不同的位置group.id
(您可以使用覆蓋消費者工廠的groupId
屬性@KafkaListener
)。否則,當?shù)诙€開始時,第一個會導(dǎo)致重新平衡。當前的 2.1.x 版本是 2.1.10。

互換的青春
TA貢獻1797條經(jīng)驗 獲得超6個贊
好的,經(jīng)過更多調(diào)查后,我能夠確定我這邊發(fā)生了什么樣的問題。所以基本上我們有一個包含多個主題的消費者組。因此,在我的情況下,我們?yōu)槊總€主題設(shè)置了 0 個分區(qū)(據(jù)我所知,沒有分區(qū),我們使用主題的 1 個隊列進行操作)。因此,當我連接到該 kafka 實例時 - 所有消費者都連接到這些主題,但是當有人也連接到該主題(可能是我的同事)時,正在發(fā)生重新平衡,他開始聽這些主題之一而不是我(由于事實上每個分區(qū)只能有一個使用者)。
添加回答
舉報
0/150
提交
取消