我正在嘗試為我的 Kafka 使用者編寫集成測試。我已經(jīng)遵循了官方參考文檔,但是當我開始測試時,我只看到這個重復的廣告無限期:-2019-04-03 15:47:34.002 WARN 13120 --- [主] 組織.apache.kafka.clients.Network客戶端 : [消費者客戶端 Id=消費者-1,groupId=我的-組] 無法建立與節(jié)點 -1 的連接。經(jīng)紀人可能不可用。我做錯了什么?我正在使用 JUnit5、彈簧靴和 .spring-kafkaspring-kafka-test我有我的班級的注釋。@EnableKafka@Configuration我的測試類是這樣的:@ExtendWith(SpringExtension::class)@SpringBootTest(classes = [TestKafkaConfig::class])@DirtiesContext@EmbeddedKafka( partitions = 1, topics = [KafkaIntegrationTest.MY_TOPIC])class KafkaIntegrationTest { @Autowired private lateinit var embeddedKafka: EmbeddedKafkaBroker @Test fun test() { val senderProps = KafkaTestUtils.senderProps(embeddedKafka.brokersAsString) val template = KafkaTemplate(DefaultKafkaProducerFactory<Int, String>(senderProps)) template.defaultTopic = KafkaIntegrationTest.MY_TOPIC template.sendDefault("foo") }}我看起來像這樣:application.ymlkafka: consumer: group-id: my-group bootstrap-servers: ${BOOTSTRAP_SERVERS:localhost:9092} value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer key-deserializer: org.apache.kafka.common.serialization.StringDeserializer properties: schema.registry.url: ${SCHEMA_REGISTRY_URL:http://localhost:8081} specific.avro.reader: true我也嘗試過設置一個,但我得到完全相同的重復消息。(這就是我嘗試設置的方式):MockSchemaRegistryClientMockSchemaRegistryClient@TestConfiguration@Import(TestConfig::class)class TestKafkaConfig { @Autowired private lateinit var props: KafkaProperties @Bean fun schemaRegistryClient() = MockSchemaRegistryClient() @Bean fun kafkaAvroSerializer() = KafkaAvroSerializer(schemaRegistryClient()) @Bean fun kafkaAvroDeserializer() = KafkaAvroDeserializer(schemaRegistryClient(), props.buildConsumerProperties())我做錯了什么?請注意,我正在使用融合模式注冊表,并嘗試從Avro反序列化。
1 回答

炎炎設計
TA貢獻1808條經(jīng)驗 獲得超4個贊
我相信您錯過了為測試設置代理 URL。
文檔中有一條關(guān)于如何獲取此值的說明:
當嵌入式卡夫卡和嵌入式動物園管理員服務器由嵌入式卡夫布魯克啟動時,名為 spring.embedded.kafka.brokers 的系統(tǒng)屬性將設置為卡夫卡代理的地址,而名為 spring.embedded.zookeeper.連接的系統(tǒng)屬性將設置為動物園管理員的地址。為此屬性提供了方便的常量(EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS和EmbeddedKafkaBroker.SPRING_EMBEDDED_ZOOKEEPER_CONNECT)。
(它位于此處的 junit 部分的底部)
解決此問題的一種方法是在測試中將此值設置為此值,例如kafka.consumers.bootstrap-servers
spring: kafka: consumer: bootstrap-servers: ${spring.embedded.kafka.brokers}
添加回答
舉報
0/150
提交
取消