4 回答

TA貢獻(xiàn)1818條經(jīng)驗(yàn) 獲得超3個(gè)贊
我稍微調(diào)查了一下,發(fā)現(xiàn)問題出在 KafkaAvroSerializer/Deserializer 使用的 CashedSchemaRegistryClient 中。它用于從 Confluent Schema Registry 中獲取模式定義。
您已經(jīng)在本地?fù)碛屑軜?gòu)定義,因此您無需為它們轉(zhuǎn)到架構(gòu)注冊(cè)表。(至少在你的測(cè)試中)
我有一個(gè)類似的問題,我通過創(chuàng)建自定義 KafkaAvroSerializer/KafkaAvroDeserializer 解決了它。
這是 KafkaAvroSerializer 的示例。這很簡(jiǎn)單。您只需要擴(kuò)展提供的 KafkaAvroSerializer 并告訴他使用 MockSchemaRegistryClient。
public class CustomKafkaAvroSerializer extends KafkaAvroSerializer {
? ? public CustomKafkaAvroSerializer() {
? ? ? ? super();
? ? ? ? super.schemaRegistry = new MockSchemaRegistryClient();
? ? }
? ? public CustomKafkaAvroSerializer(SchemaRegistryClient client) {
? ? ? ? super(new MockSchemaRegistryClient());
? ? }
? ? public CustomKafkaAvroSerializer(SchemaRegistryClient client, Map<String, ?> props) {
? ? ? ? super(new MockSchemaRegistryClient(), props);
? ? }
}
這是 KafkaAvroDeserializer 的示例。當(dāng)調(diào)用反序列化方法時(shí),您需要告訴他要使用哪個(gè)模式。
public class CustomKafkaAvroDeserializer extends KafkaAvroDeserializer {
? ? @Override
? ? public Object deserialize(String topic, byte[] bytes) {
? ? ? ? this.schemaRegistry = getMockClient(KafkaEvent.SCHEMA$);??
? ? ? ? return super.deserialize(topic, bytes);
? ? }
? ? private static SchemaRegistryClient getMockClient(final Schema schema$) {
? ? ? ? return new MockSchemaRegistryClient() {
? ? ? ? ? ? @Override
? ? ? ? ? ? public synchronized Schema getById(int id) {
? ? ? ? ? ? ? ? return schema$;
? ? ? ? ? ? }
? ? ? ? };
? ? }
}
最后一步是告訴 spring 使用創(chuàng)建的序列化器/反序列化器
spring.kafka.producer.properties.schema.registry.url= not-used
spring.kafka.producer.value-serializer = CustomKafkaAvroSerializer
spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.group-id = showcase-producer-id
spring.kafka.consumer.properties.schema.registry.url= not-used
spring.kafka.consumer.value-deserializer = CustomKafkaAvroDeserializer
spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.group-id = showcase-consumer-id
spring.kafka.auto.offset.reset = earliest
spring.kafka.producer.auto.register.schemas= true
spring.kafka.properties.specific.avro.reader= true

TA貢獻(xiàn)1780條經(jīng)驗(yàn) 獲得超1個(gè)贊
如果你在 3 年后看這個(gè)例子,你可能想要對(duì) CustomKafkaAvroDeserializer 做一些小的修改
private static SchemaRegistryClient getMockClient(final Schema schema) {
return new MockSchemaRegistryClient() {
@Override
public ParsedSchema getSchemaBySubjectAndId(String subject, int id)
throws IOException, RestClientException {
return new AvroSchema(schema);
}
};
}

TA貢獻(xiàn)1829條經(jīng)驗(yàn) 獲得超4個(gè)贊
如果您的 @KafkaListener 在測(cè)試類中,那么您可以在 StringDeserializer 中讀取它,然后手動(dòng)將其轉(zhuǎn)換為所需的類
@Autowired
private MyKafkaAvroDeserializer myKafkaAvroDeserializer;
@KafkaListener( topics = "test")
public void inputData(ConsumerRecord<?, ?> consumerRecord) {
log.info("received payload='{}'", consumerRecord.toString(),consumerRecord.value());
GenericRecord genericRecord = (GenericRecord)myKafkaAvroDeserializer.deserialize("test",consumerRecord.value().toString().getBytes(StandardCharsets.UTF_8));
Myclass myclass = (Myclass) SpecificData.get().deepCopy(Myclass.SCHEMA$, genericRecord);
}
@Component
public class MyKafkaAvroDeserializer extends KafkaAvroDeserializer {
@Override
public Object deserialize(String topic, byte[] bytes) {
this.schemaRegistry = getMockClient(Myclass.SCHEMA$);
return super.deserialize(topic, bytes);
}
private static SchemaRegistryClient getMockClient(final Schema schema$) {
return new MockSchemaRegistryClient() {
@Override
public synchronized org.apache.avro.Schema getById(int id) {
return schema$;
}
};
}
}
記得在 application.yml 中添加 schema registry 和 key/value serializer 雖然不會(huì)用到
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
schema.registry.url :http://localhost:8080

TA貢獻(xiàn)1875條經(jīng)驗(yàn) 獲得超5個(gè)贊
如錯(cuò)誤所述,您需要在生產(chǎn)者配置中向注冊(cè)表提供一個(gè)字符串,而不是一個(gè)對(duì)象。
由于您使用的是 Mock 類,因此該字符串可以是任何東西......
但是,您需要在給定注冊(cè)表實(shí)例的情況下構(gòu)造序列化程序
Serializer serializer = new KafkaAvroSerializer(mockSchemaRegistry);
// make config map with ("schema.registry.url", "unused")
serializer.configure(config, false);
否則,它將嘗試創(chuàng)建一個(gè)非模擬客戶端
并將其放入屬性中
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serializer);
添加回答
舉報(bào)