第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號(hào)安全,請及時(shí)綁定郵箱和手機(jī)立即綁定
已解決430363個(gè)問題,去搜搜看,總會(huì)有你想問的

AVRO 原始類型的 Serde 類

AVRO 原始類型的 Serde 類

蕪湖不蕪 2021-08-04 17:04:21
我正在用 Java 編寫一個(gè) Kafka 流應(yīng)用程序,它接受由連接器創(chuàng)建的輸入主題,該連接器使用模式注冊表和 avro 作為鍵和值轉(zhuǎn)換器。連接器產(chǎn)生以下模式:key-schema: "int"value-schema:{"type": "record","name": "User","fields": [    {"name": "firstname", "type": "string"},    {"name": "lastname",  "type": "string"}]}實(shí)際上,有幾個(gè)主題,key-schema 總是“int”,value-schema 總是某種記錄(用戶、產(chǎn)品等)。我的代碼包含以下定義Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url", schemaRegistryUrl);Serde<User> userSerde = new SpecificAvroSerde<>();userSerde.configure(serdeConfig, false);起初我嘗試使用類似的東西來消費(fèi)這個(gè)主題, Consumed.with(Serdes.Integer(), userSerde);但這不起作用,因?yàn)?Serdes.Integer() 期望使用 4 個(gè)字節(jié)對整數(shù)進(jìn)行編碼,但 avro 使用可變長度編碼。使用Consumed.with(Serdes.Bytes(), userSerde);有效,但我真的想要 int 而不是字節(jié),所以我將代碼更改為此KafkaAvroDeserializer keyDeserializer = new KafkaAvroDeserializer()KafkaAvroSerializer keySerializer = new KafkaAvroSerializer();keyDeserializer.configure(serdeConfig, true); keySerializer.configure(serdeConfig, true);Serde<Integer> keySerde = (Serde<Integer>)(Serde)Serdes.serdeFrom(keySerializer, keyDeserializer);這使編譯器產(chǎn)生警告(它不喜歡(Serde<Integer>)(Serde)強(qiáng)制轉(zhuǎn)換)但它允許我使用Consumed.with(keySerde, userSerde);并獲取一個(gè)整數(shù)作為鍵。這工作得很好,我的應(yīng)用程序按預(yù)期運(yùn)行(很棒?。。。?。但是現(xiàn)在我想為鍵/值定義默認(rèn)的 serde 并且我無法讓它工作。設(shè)置默認(rèn)值 serde 很簡單:streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);但是我無法弄清楚如何定義默認(rèn)鍵 serde。我試過了streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keySerde.getClass().getName()); 產(chǎn)生運(yùn)行時(shí)錯(cuò)誤:找不到 org.apache.kafka.common.serialization.Serdes$WrapperSerde 的公共無參數(shù)構(gòu)造函數(shù)streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class); 產(chǎn)生運(yùn)行時(shí)錯(cuò)誤:java.lang.Integer 不能轉(zhuǎn)換為 org.apache.avro.specific.SpecificRecord我錯(cuò)過了什么?謝謝。
查看完整描述

2 回答

?
翻翻過去那場雪

TA貢獻(xiàn)2065條經(jīng)驗(yàn) 獲得超14個(gè)贊

我想發(fā)布解決方案的工作。請隨意增強(qiáng)它。


import java.util.Collections;

import java.util.Map;


import org.apache.kafka.common.serialization.Deserializer;

import org.apache.kafka.common.serialization.Serde;

import org.apache.kafka.common.serialization.Serdes;

import org.apache.kafka.common.serialization.Serializer;


import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;

import io.confluent.kafka.serializers.KafkaAvroDeserializer;

import io.confluent.kafka.serializers.KafkaAvroSerializer;


public class GenericPrimitiveAvroSerDe<T> implements Serde<T> {


    private final Serde<Object> inner;


    /**

     * Constructor used by Kafka Streams.

     */

    public GenericPrimitiveAvroSerDe() {

        inner = Serdes.serdeFrom(new KafkaAvroSerializer(), new KafkaAvroDeserializer());

    }


    public GenericPrimitiveAvroSerDe(SchemaRegistryClient client) {

        this(client, Collections.emptyMap());

    }


    public GenericPrimitiveAvroSerDe(SchemaRegistryClient client, Map<String, ?> props) {

        inner = Serdes.serdeFrom(new KafkaAvroSerializer(client), new KafkaAvroDeserializer(client, props));

    }


    @Override

    public void configure(final Map<String, ?> serdeConfig, final boolean isSerdeForRecordKeys) {

        inner.serializer().configure(serdeConfig, isSerdeForRecordKeys);

        inner.deserializer().configure(serdeConfig, isSerdeForRecordKeys);

    }


    @Override

    public void close() {

        // TODO Auto-generated method stub

        inner.serializer().close();

        inner.deserializer().close();


    }


    @SuppressWarnings("unchecked")

    @Override

    public Serializer<T> serializer() {

        // TODO Auto-generated method stub

        Object obj = inner.serializer();

        return (Serializer<T>) obj;


    }


    @SuppressWarnings("unchecked")

    @Override

    public Deserializer<T> deserializer() {

        // TODO Auto-generated method stub

        Object obj = inner.deserializer();

        return (Deserializer<T>) obj;


    }


}

用作默認(rèn)流配置:


props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,GenericPrimitiveAvroSerDe.class);

            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,GenericPrimitiveAvroSerDe.class);

覆蓋默認(rèn)值:


final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url",

                                                                        "http://localhost:8081");

       final GenericPrimitiveAvroSerDe<String> keyGenericAvroSerde = new GenericPrimitiveAvroSerDe<String>();

       keyGenericAvroSerde.configure(serdeConfig, true); // `true` for record keys

       final GenericPrimitiveAvroSerDe<Long> valueGenericAvroSerde = new GenericPrimitiveAvroSerDe<Long>();

       valueGenericAvroSerde.configure(serdeConfig, false); // `false` for record values



查看完整回答
反對 回復(fù) 2021-08-04
  • 2 回答
  • 0 關(guān)注
  • 154 瀏覽

添加回答

舉報(bào)

0/150
提交
取消
微信客服

購課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動(dòng)學(xué)習(xí)伙伴

公眾號(hào)

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號(hào)