我真的很難編寫一個(gè)測試來檢查當(dāng)消息發(fā)送到它的指定主題時(shí)我的 Kafka 消費(fèi)者是否被正確調(diào)用。我的消費(fèi)者:@Service@Slf4j@AllArgsConstructor(onConstructor = @__(@Autowired))public class ProcessingConsumer { private AppService appService; @KafkaListener( topics = "${topic}", containerFactory = "processingConsumerContainerFactory") public void listen(ConsumerRecord<Key, Value> message, Acknowledgment ack) { try { appService.processMessage(message); ack.acknowledge(); } catch (Throwable t) { log.error("error while processing message!", t); } }}我的消費(fèi)者配置:@EnableKafka@Configurationpublic class ProcessingCosumerConfig { @Value("${spring.kafka.schema-registry-url}") private String schemaRegistryUrl; private KafkaProperties props; public ProcessingCosumerConfig(KafkaProperties kafkaProperties) { this.props = kafkaProperties; } public Map<String, Object> deserializerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); return props; } private KafkaAvroDeserializer getKafkaAvroDeserializer(Boolean isKey) { KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer(); kafkaAvroDeserializer.configure(deserializerConfigs(), isKey); return kafkaAvroDeserializer; } private DefaultKafkaConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory<>( props.buildConsumerProperties(), getKafkaAvroDeserializer(true), getKafkaAvroDeserializer(false)); }
1 回答

人到中年有點(diǎn)甜
TA貢獻(xiàn)1895條經(jīng)驗(yàn) 獲得超7個(gè)贊
將模擬 AppService 注入偵聽器并驗(yàn)證其 processMessage() 已被調(diào)用。
添加回答
舉報(bào)
0/150
提交
取消