第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號安全,請及時綁定郵箱和手機(jī)立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

使用 Apache Beam 反序列化 Kafka AVRO 消息

使用 Apache Beam 反序列化 Kafka AVRO 消息

慕村225694 2022-06-23 20:01:30
主要目標(biāo)是聚合兩個 Kafka 主題,一個是壓縮的慢速移動數(shù)據(jù),另一個是每秒接收的快速移動數(shù)據(jù)。我已經(jīng)能夠在簡單的場景中使用消息,例如使用以下內(nèi)容的 KV (Long,String):PCollection<KV<Long,String>> input = p.apply(KafkaIO.<Long, String>read().withKeyDeserializer(LongDeserializer.class).withValueDeserializer(StringDeserializer.class)  PCollection<String> output = input.apply(Values.<String>create());但這似乎不是當(dāng)您需要從 AVRO 反序列化時的方法。我有一個需要消耗的 KV(STRING, AVRO)。我嘗試從 AVRO 模式生成 Java 類,然后將它們包含在“應(yīng)用”中,例如:PCollection<MyClass> output = input.apply(Values.<MyClass>create());但這似乎不是正確的方法。是否有任何人可以指出我的文檔/示例,以便我了解您將如何使用 Kafka AVRO 和 Beam?我已經(jīng)更新了我的代碼:import io.confluent.kafka.serializers.KafkaAvroDeserializer;import org.apache.beam.sdk.Pipeline;import org.apache.beam.sdk.coders.AvroCoder;import org.apache.beam.sdk.io.kafka.KafkaIO;import org.apache.beam.sdk.options.PipelineOptions;import org.apache.beam.sdk.options.PipelineOptionsFactory;import org.apache.beam.sdk.values.KV;import org.apache.beam.sdk.values.PCollection;import org.apache.kafka.common.serialization.LongDeserializer;public class Main {public static void main(String[] args) {    PipelineOptions options = PipelineOptionsFactory.create();    Pipeline p = Pipeline.create(options);    PCollection<KV<Long, Myclass>> input = p.apply(KafkaIO.<Long, String>read()            .withKeyDeserializer(LongDeserializer.class)            .withValueDeserializerAndCoder(KafkaAvroDeserializer.class, AvroCoder.of(Myclass.class))    );    p.run();}}import org.apache.beam.sdk.coders.AvroCoder;import org.apache.beam.sdk.coders.DefaultCoder;@DefaultCoder(AvroCoder.class)public class Myclass{String name;String age;Myclass(){}Myclass(String n, String a) {    this.name= n;    this.age= a;}}但我現(xiàn)在收到以下錯誤incompatible types: java.lang.Class < io.confluent.kafka.serializers.KafkaAvroDeserializer > cannot be converted to java.lang.Class < ? extends org.apache.kafka.common.serialization.Deserializer < java.lang.String > >我必須導(dǎo)入不正確的序列化程序?
查看完整描述

4 回答

?
SMILET

TA貢獻(xiàn)1796條經(jīng)驗 獲得超4個贊

我遇到了同樣的問題。在此郵件存檔中找到了解決方案。 http://mail-archives.apache.org/mod_mbox/beam-user/201710.mbox/%3CCAMsy_NiVrT_9_xfxOtK1inHxb=x_yAdBcBN+4aquu_hn0GJ0nA@mail.gmail.com%3E


在您的情況下,您需要定義自己的,它可以從如下Deserializer<MyClass>擴(kuò)展。AbstractKafkaAvroDeserializer


public class MyClassKafkaAvroDeserializer extends

  AbstractKafkaAvroDeserializer implements Deserializer<MyClass> {

  

  @Override

  public void configure(Map<String, ?> configs, boolean isKey) {

      configure(new KafkaAvroDeserializerConfig(configs));

  }


  @Override

  public MyClass deserialize(String s, byte[] bytes) {

      return (MyClass) this.deserialize(bytes);

  }


  @Override

  public void close() {} }

然后將您的KafkaAvroDeserializer指定為 ValueDeserializer。


p.apply(KafkaIO.<Long, MyClass>read()

 .withKeyDeserializer(LongDeserializer.class)

 .withValueDeserializer(MyClassKafkaAvroDeserializer.class) );


查看完整回答
反對 回復(fù) 2022-06-23
?
小唯快跑啊

TA貢獻(xiàn)1863條經(jīng)驗 獲得超2個贊

您可以使用 KafkaAvroDeserializer,如下所示:


PCollection<KV<Long,MyClass>> input = p.apply(KafkaIO.<Long, String>read()

.withKeyDeserializer(LongDeserializer.class)

  .withValueDeserializerAndCoder(KafkaAvroDeserializer.class, AvroCoder.of(MyClass.class))

其中MyClass是 POJO 類生成的 Avro Schema。


確保您的 POJO 類具有注釋 AvroCoder,如下例所示:


@DefaultCoder(AvroCoder.class)

   public class MyClass{

      String name;

      String age;


      MyClass(){}

      MyClass(String n, String a) {

         this.name= n;

         this.age= a;

      }

  }


查看完整回答
反對 回復(fù) 2022-06-23
?
Cats萌萌

TA貢獻(xiàn)1805條經(jīng)驗 獲得超9個贊

我今天遇到了類似的問題,并遇到了以下示例,它為我解決了這個問題。

https://github.com/andrewrjones/debezium-kafka-beam-example/blob/master/src/main/java/com/andrewjones/KafkaAvroConsumerExample.java

對我來說缺少的部分是(類)KafkaAvroDeserializer

KafkaIO.<String, MyClass>read()
        .withBootstrapServers("kafka:9092")
        .withTopic("dbserver1.inventory.customers")
        .withKeyDeserializer(StringDeserializer.class)
        .withValueDeserializerAndCoder((Class)KafkaAvroDeserializer.class, AvroCoder.of(MyClass.class))



查看完整回答
反對 回復(fù) 2022-06-23
?
慕斯709654

TA貢獻(xiàn)1840條經(jīng)驗 獲得超5個贊

我也發(fā)現(xiàn)這行得通


import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer;


...


public static class CustomKafkaAvroDeserializer extends SpecificAvroDeserializer<MyCustomClass> {}


...

.withValueDeserializerAndCoder(CustomKafkaAvroDeserializer.class, AvroCoder.of(MyCustomClass.class))

...

MyCustomClass使用 Avro 工具生成的代碼在哪里。


查看完整回答
反對 回復(fù) 2022-06-23
  • 4 回答
  • 0 關(guān)注
  • 142 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動學(xué)習(xí)伙伴

公眾號

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號