2 回答

TA貢獻(xiàn)2065條經(jīng)驗(yàn) 獲得超14個(gè)贊
我想發(fā)布解決方案的工作。請隨意增強(qiáng)它。
import java.util.Collections;
import java.util.Map;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
public class GenericPrimitiveAvroSerDe<T> implements Serde<T> {
private final Serde<Object> inner;
/**
* Constructor used by Kafka Streams.
*/
public GenericPrimitiveAvroSerDe() {
inner = Serdes.serdeFrom(new KafkaAvroSerializer(), new KafkaAvroDeserializer());
}
public GenericPrimitiveAvroSerDe(SchemaRegistryClient client) {
this(client, Collections.emptyMap());
}
public GenericPrimitiveAvroSerDe(SchemaRegistryClient client, Map<String, ?> props) {
inner = Serdes.serdeFrom(new KafkaAvroSerializer(client), new KafkaAvroDeserializer(client, props));
}
@Override
public void configure(final Map<String, ?> serdeConfig, final boolean isSerdeForRecordKeys) {
inner.serializer().configure(serdeConfig, isSerdeForRecordKeys);
inner.deserializer().configure(serdeConfig, isSerdeForRecordKeys);
}
@Override
public void close() {
// TODO Auto-generated method stub
inner.serializer().close();
inner.deserializer().close();
}
@SuppressWarnings("unchecked")
@Override
public Serializer<T> serializer() {
// TODO Auto-generated method stub
Object obj = inner.serializer();
return (Serializer<T>) obj;
}
@SuppressWarnings("unchecked")
@Override
public Deserializer<T> deserializer() {
// TODO Auto-generated method stub
Object obj = inner.deserializer();
return (Deserializer<T>) obj;
}
}
用作默認(rèn)流配置:
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,GenericPrimitiveAvroSerDe.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,GenericPrimitiveAvroSerDe.class);
覆蓋默認(rèn)值:
final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url",
"http://localhost:8081");
final GenericPrimitiveAvroSerDe<String> keyGenericAvroSerde = new GenericPrimitiveAvroSerDe<String>();
keyGenericAvroSerde.configure(serdeConfig, true); // `true` for record keys
final GenericPrimitiveAvroSerDe<Long> valueGenericAvroSerde = new GenericPrimitiveAvroSerDe<Long>();
valueGenericAvroSerde.configure(serdeConfig, false); // `false` for record values
添加回答
舉報(bào)