我正在構建一個微服務組件,它將默認使用由其他 (SCS) 組件生成的 Spring Cloud Stream (SCS) Kafka 消息。但是我還需要從使用融合 API 的其他組件中使用 Kafka 消息。我有一個示例存儲庫,顯示了我正在嘗試做什么。https://github.com/donalthurley/KafkaConsumeScsAndConfluent這是下面帶有 SCS 輸入綁定和融合輸入綁定的應用程序配置。spring: application: name: kafka kafka: consumer: properties.schema.registry.url: http://192.168.99.100:8081 cloud: stream: kafka: binder: brokers: PLAINTEXT://192.168.99.100:9092# configuration:# specific:# avro:# reader: true# key:# deserializer: org.apache.kafka.common.serialization.StringDeserializer# value:# deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer bindings: inputConfluent: contentType: application/*+avro destination: confluent-destination group: input-confluent-group inputScs: contentType: application/*+avro destination: scs-destination group: input-scs-group通過上述配置,我使用 SCS 默認配置創(chuàng)建了兩個使用者。例如,類 org.apache.kafka.common.serialization.ByteArrayDeserializer 是兩個輸入綁定的值反序列化器。如果我刪除上述配置中的注釋,我會從我的 Confluent 客戶端發(fā)送配置的兩個消費者。例如,類 io.confluent.kafka.serializers.KafkaAvroDeserializer 是兩個輸入綁定的值反序列化器。我理解因為配置在 Kafka 綁定器上,它將適用于使用該綁定器定義的所有消費者。有什么方法可以定義這些特定屬性,以便它們僅適用于融合的特定消費者綁定,而所有其他輸入綁定都可以使用默認的 SCS 配置?
添加回答
舉報
0/150
提交
取消