4 回答

TA貢獻(xiàn)1796條經(jīng)驗 獲得超4個贊
我遇到了同樣的問題。在此郵件存檔中找到了解決方案。 http://mail-archives.apache.org/mod_mbox/beam-user/201710.mbox/%3CCAMsy_NiVrT_9_xfxOtK1inHxb=x_yAdBcBN+4aquu_hn0GJ0nA@mail.gmail.com%3E
在您的情況下,您需要定義自己的,它可以從如下Deserializer<MyClass>擴(kuò)展。AbstractKafkaAvroDeserializer
public class MyClassKafkaAvroDeserializer extends
AbstractKafkaAvroDeserializer implements Deserializer<MyClass> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
configure(new KafkaAvroDeserializerConfig(configs));
}
@Override
public MyClass deserialize(String s, byte[] bytes) {
return (MyClass) this.deserialize(bytes);
}
@Override
public void close() {} }
然后將您的KafkaAvroDeserializer指定為 ValueDeserializer。
p.apply(KafkaIO.<Long, MyClass>read()
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(MyClassKafkaAvroDeserializer.class) );

TA貢獻(xiàn)1863條經(jīng)驗 獲得超2個贊
您可以使用 KafkaAvroDeserializer,如下所示:
PCollection<KV<Long,MyClass>> input = p.apply(KafkaIO.<Long, String>read()
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializerAndCoder(KafkaAvroDeserializer.class, AvroCoder.of(MyClass.class))
其中MyClass是 POJO 類生成的 Avro Schema。
確保您的 POJO 類具有注釋 AvroCoder,如下例所示:
@DefaultCoder(AvroCoder.class)
public class MyClass{
String name;
String age;
MyClass(){}
MyClass(String n, String a) {
this.name= n;
this.age= a;
}
}

TA貢獻(xiàn)1805條經(jīng)驗 獲得超9個贊
我今天遇到了類似的問題,并遇到了以下示例,它為我解決了這個問題。
對我來說缺少的部分是(類)KafkaAvroDeserializer
KafkaIO.<String, MyClass>read() .withBootstrapServers("kafka:9092") .withTopic("dbserver1.inventory.customers") .withKeyDeserializer(StringDeserializer.class) .withValueDeserializerAndCoder((Class)KafkaAvroDeserializer.class, AvroCoder.of(MyClass.class))

TA貢獻(xiàn)1840條經(jīng)驗 獲得超5個贊
我也發(fā)現(xiàn)這行得通
import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer;
...
public static class CustomKafkaAvroDeserializer extends SpecificAvroDeserializer<MyCustomClass> {}
...
.withValueDeserializerAndCoder(CustomKafkaAvroDeserializer.class, AvroCoder.of(MyCustomClass.class))
...
MyCustomClass使用 Avro 工具生成的代碼在哪里。
添加回答
舉報