第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號(hào)安全,請(qǐng)及時(shí)綁定郵箱和手機(jī)立即綁定
已解決430363個(gè)問題,去搜搜看,總會(huì)有你想問的

使用 Avro Schema 注冊(cè)表的 Kafka 消費(fèi)者單元測(cè)試失敗

使用 Avro Schema 注冊(cè)表的 Kafka 消費(fèi)者單元測(cè)試失敗

紅糖糍粑 2023-06-14 14:00:11
我正在編寫一個(gè)消費(fèi)者,它會(huì)收聽 Kafka 主題并在消息可用時(shí)使用消息。我通過在本地運(yùn)行 Kafka 測(cè)試了邏輯/代碼,它運(yùn)行良好。在編寫單元/組件測(cè)試用例時(shí),它因 avro 架構(gòu)注冊(cè)表 url 錯(cuò)誤而失敗。我嘗試了互聯(lián)網(wǎng)上可用的不同選項(xiàng),但找不到任何有效的方法。我不確定我的方法是否正確。請(qǐng)幫忙。監(jiān)聽類@KafkaListener(topics = "positionmgmt.v1", containerFactory = "genericKafkaListenerFactory")    public void receive(ConsumerRecord<String, GenericRecord> consumerRecord) {        try {            GenericRecord generic = consumerRecord.value();            Object obj = generic.get("metadata");            ObjectMapper mapper = new ObjectMapper();            Header headerMetaData = mapper.readValue(obj.toString(), Header.class);            System.out.println("Received payload :   " + consumerRecord.value());            //Call backend with details in GenericRecord         }catch (Exception e){            System.out.println("Exception while reading message from Kafka " + e );        }卡夫卡配置@Bean    public ConcurrentKafkaListenerContainerFactory<String, GenericRecord> genericKafkaListenerFactory() {        ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();        factory.setConsumerFactory(genericConsumerFactory());        return factory;    }public ConsumerFactory<String, GenericRecord> genericConsumerFactory() {        Map<String, Object> config = new HashMap<>();        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");        config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);        config.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG,"http://localhost:8081");        return new DefaultKafkaConsumerFactory<>(config);    }
查看完整描述

4 回答

?
泛舟湖上清波郎朗

TA貢獻(xiàn)1818條經(jīng)驗(yàn) 獲得超3個(gè)贊

我稍微調(diào)查了一下,發(fā)現(xiàn)問題出在 KafkaAvroSerializer/Deserializer 使用的 CashedSchemaRegistryClient 中。它用于從 Confluent Schema Registry 中獲取模式定義。


您已經(jīng)在本地?fù)碛屑軜?gòu)定義,因此您無需為它們轉(zhuǎn)到架構(gòu)注冊(cè)表。(至少在你的測(cè)試中)


我有一個(gè)類似的問題,我通過創(chuàng)建自定義 KafkaAvroSerializer/KafkaAvroDeserializer 解決了它。


這是 KafkaAvroSerializer 的示例。這很簡(jiǎn)單。您只需要擴(kuò)展提供的 KafkaAvroSerializer 并告訴他使用 MockSchemaRegistryClient。


public class CustomKafkaAvroSerializer extends KafkaAvroSerializer {

? ? public CustomKafkaAvroSerializer() {

? ? ? ? super();

? ? ? ? super.schemaRegistry = new MockSchemaRegistryClient();

? ? }


? ? public CustomKafkaAvroSerializer(SchemaRegistryClient client) {

? ? ? ? super(new MockSchemaRegistryClient());

? ? }


? ? public CustomKafkaAvroSerializer(SchemaRegistryClient client, Map<String, ?> props) {

? ? ? ? super(new MockSchemaRegistryClient(), props);

? ? }

}

這是 KafkaAvroDeserializer 的示例。當(dāng)調(diào)用反序列化方法時(shí),您需要告訴他要使用哪個(gè)模式。


public class CustomKafkaAvroDeserializer extends KafkaAvroDeserializer {

? ? @Override

? ? public Object deserialize(String topic, byte[] bytes) {

? ? ? ? this.schemaRegistry = getMockClient(KafkaEvent.SCHEMA$);??

? ? ? ? return super.deserialize(topic, bytes);

? ? }


? ? private static SchemaRegistryClient getMockClient(final Schema schema$) {

? ? ? ? return new MockSchemaRegistryClient() {

? ? ? ? ? ? @Override

? ? ? ? ? ? public synchronized Schema getById(int id) {

? ? ? ? ? ? ? ? return schema$;

? ? ? ? ? ? }

? ? ? ? };

? ? }

}

最后一步是告訴 spring 使用創(chuàng)建的序列化器/反序列化器


spring.kafka.producer.properties.schema.registry.url= not-used

spring.kafka.producer.value-serializer = CustomKafkaAvroSerializer

spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer

spring.kafka.producer.group-id = showcase-producer-id


spring.kafka.consumer.properties.schema.registry.url= not-used

spring.kafka.consumer.value-deserializer = CustomKafkaAvroDeserializer

spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.consumer.group-id = showcase-consumer-id

spring.kafka.auto.offset.reset = earliest


spring.kafka.producer.auto.register.schemas= true

spring.kafka.properties.specific.avro.reader= true

查看完整回答
反對(duì) 回復(fù) 2023-06-14
?
慕神8447489

TA貢獻(xiàn)1780條經(jīng)驗(yàn) 獲得超1個(gè)贊

如果你在 3 年后看這個(gè)例子,你可能想要對(duì) CustomKafkaAvroDeserializer 做一些小的修改


private static SchemaRegistryClient getMockClient(final Schema schema) {

        return new MockSchemaRegistryClient() {


     @Override

     public ParsedSchema getSchemaBySubjectAndId(String subject, int id)

                    throws IOException, RestClientException {

         return new AvroSchema(schema);

     }            

 };

}


查看完整回答
反對(duì) 回復(fù) 2023-06-14
?
浮云間

TA貢獻(xiàn)1829條經(jīng)驗(yàn) 獲得超4個(gè)贊

如果您的 @KafkaListener 在測(cè)試類中,那么您可以在 StringDeserializer 中讀取它,然后手動(dòng)將其轉(zhuǎn)換為所需的類


    @Autowired

    private MyKafkaAvroDeserializer myKafkaAvroDeserializer;


    @KafkaListener( topics = "test")

    public void inputData(ConsumerRecord<?, ?> consumerRecord) {

        log.info("received payload='{}'", consumerRecord.toString(),consumerRecord.value());


        GenericRecord genericRecord = (GenericRecord)myKafkaAvroDeserializer.deserialize("test",consumerRecord.value().toString().getBytes(StandardCharsets.UTF_8));



        Myclass myclass = (Myclass) SpecificData.get().deepCopy(Myclass.SCHEMA$, genericRecord);

}

@Component

public class MyKafkaAvroDeserializer extends KafkaAvroDeserializer {

    @Override

    public Object deserialize(String topic, byte[] bytes) {


            this.schemaRegistry = getMockClient(Myclass.SCHEMA$);


        return super.deserialize(topic, bytes);

    }




    private static SchemaRegistryClient getMockClient(final Schema schema$) {

        return new MockSchemaRegistryClient() {

            @Override

            public synchronized org.apache.avro.Schema getById(int id) {

                return schema$;

            }

        };

    }

}

記得在 application.yml 中添加 schema registry 和 key/value serializer 雖然不會(huì)用到


    consumer:

      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer

      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

    properties:

      schema.registry.url :http://localhost:8080


查看完整回答
反對(duì) 回復(fù) 2023-06-14
?
慕田峪4524236

TA貢獻(xiàn)1875條經(jīng)驗(yàn) 獲得超5個(gè)贊

如錯(cuò)誤所述,您需要在生產(chǎn)者配置中向注冊(cè)表提供一個(gè)字符串,而不是一個(gè)對(duì)象。


由于您使用的是 Mock 類,因此該字符串可以是任何東西......


但是,您需要在給定注冊(cè)表實(shí)例的情況下構(gòu)造序列化程序


Serializer serializer = new KafkaAvroSerializer(mockSchemaRegistry);

 // make config map with ("schema.registry.url", "unused") 

serializer.configure(config, false);

否則,它將嘗試創(chuàng)建一個(gè)非模擬客戶端


并將其放入屬性中


producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serializer);


查看完整回答
反對(duì) 回復(fù) 2023-06-14
  • 4 回答
  • 0 關(guān)注
  • 254 瀏覽
慕課專欄
更多

添加回答

舉報(bào)

0/150
提交
取消
微信客服

購(gòu)課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動(dòng)學(xué)習(xí)伙伴

公眾號(hào)

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號(hào)