考慮以下代碼:@Test(singleThreaded = true)public class KafkaConsumerTest{ private KafkaTemplate<String, byte[]> template; private DefaultKafkaConsumerFactory<String, byte[]> consumerFactory; private static final KafkaEmbedded EMBEDDED_KAFKA; static { EMBEDDED_KAFKA = new KafkaEmbedded(1, true, "topic"); try { EMBEDDED_KAFKA.before(); } catch (final Exception e) { e.printStackTrace(); } } @BeforeMethod public void setUp() throws Exception { final Map<String, Object> senderProps = KafkaTestUtils.senderProps(EMBEDDED_KAFKA.getBrokersAsString()); senderProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); final ProducerFactory<String, byte[]> pf = new DefaultKafkaProducerFactory<>(senderProps); this.template = new KafkaTemplate<>(pf); this.template.setDefaultTopic("topic"); final Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("sender", "false", EMBEDDED_KAFKA); this.consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProps); this.consumerFactory.setValueDeserializer(new ByteArrayDeserializer()); this.consumerFactory.setKeyDeserializer(new StringDeserializer()); }我正在嘗試向 a 發(fā)送消息KafkaTemplate并使用Consumer.poll(). 我使用的測試框架是TestNG。發(fā)送作品,我已經(jīng)驗證使用我在網(wǎng)上找到的“常用”代碼(在 上注冊一個消息偵聽器KafkaMessageListenerContainer)。只是,我從來沒有在消費者那里收到過任何東西。我已經(jīng)針對“真實”的 Kafka 安裝嘗試了相同的序列 (create Consumer, poll()),并且它有效。因此,我設(shè)置ConsumerFactory? 任何幫助將不勝感激!
添加回答
舉報
0/150
提交
取消