第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號(hào)安全,請(qǐng)及時(shí)綁定郵箱和手機(jī)立即綁定
已解決430363個(gè)問題,去搜搜看,總會(huì)有你想問的

將 KafkaAvroDeserializer 與 Alpakka 結(jié)合使用

將 KafkaAvroDeserializer 與 Alpakka 結(jié)合使用

楊魅力 2022-12-28 15:39:31
我有一個(gè) SchemaRegistry 和一個(gè) KafkaBroker,我使用 Avro v1.8.1 從中提取數(shù)據(jù)。對(duì)于反序列化,我一直在使用 Confluent 的KafkaAvroDeserializer。現(xiàn)在我打算重構(gòu)我的代碼以使用 Alpakka 提供的Elasticsearch API,但不幸的是這會(huì)破壞反序列化,因?yàn)樗鼤?huì)導(dǎo)致 NullPointerExceptions:線程“main”中的異常 org.apache.kafka.common.errors.SerializationException:在偏移量 0 處反序列化分區(qū) topic-0 的鍵/值時(shí)出錯(cuò)。如果需要,請(qǐng)尋找過去的記錄以繼續(xù)消費(fèi)。原因:org.apache.kafka.common.errors.SerializationException:反序列化 id 2 的 Avro 消息時(shí)出錯(cuò) 原因:io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116)處的 java.lang.NullPointerException io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88) 在 io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55) 在 org.apache.kafka.Deserialization.Deserializer.common.serializer在 org.apache.kafka.clients.consumer 中反序列化(Deserializer.java:58)。我一直在使用 Alpakka 的 ConsumerSettings API,如本例中所述:val system = ActorSystem.create();// necessary to convert timestamps correctly in Avro Version 1.8.1 to avoid ClassCastExceptionsSpecificData.get().addLogicalTypeConversion(new TimeConversions.TimestampConversion());val consumerSettings = ConsumerSettings.create(system, new StringDeserializer(), new KafkaAvroDeserializer())    .withBootstrapServers(kafkaBootstrapServerUrl)    .withClientId(InetAddress.getLocalHost().getHostName())    .withGroupId("" + new Random().nextInt())    .withProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl)    .withProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true")    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")    .withStopTimeout(Duration.ofSeconds(5));
查看完整描述

1 回答

?
倚天杖

TA貢獻(xiàn)1828條經(jīng)驗(yàn) 獲得超3個(gè)贊

我認(rèn)為您需要拉到new KafkaAvroDeserializer()它自己的變量,然后調(diào)用該.configure()實(shí)例上的方法以傳入非空注冊(cè)表 URL。

然后將配置的實(shí)例傳入ConsumerSettings.create

FWIW,根據(jù)您的需要,Kafka Connect 可以很好地加載 Elasticsearch


查看完整回答
反對(duì) 回復(fù) 2022-12-28
  • 1 回答
  • 0 關(guān)注
  • 93 瀏覽

添加回答

舉報(bào)

0/150
提交
取消
微信客服

購課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動(dòng)學(xué)習(xí)伙伴

公眾號(hào)

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號(hào)