我嘗試使用 kafka 生產(chǎn)者發(fā)送 java 字符串消息。字符串消息是從Java Spark JavaPairDStream中提取的。JavaPairDStream<String, String> processedJavaPairStream = inputStream.mapToPair (record-> new Tuple2<>(record.key(), record.value())).mapValues(message -> message.replace('>', '#'));String outTopics = "outputTopic";String broker = "localhost:9092";Properties properties = new Properties();properties.put("bootstrap.servers", broker);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<String, String>(properties);processedJavaPairStream.foreachRDD(rdd -> rdd.foreach(tuple2 -> { ProducerRecord<String, String> message = new ProducerRecord<String, String>(outTopics, tuple2._1, tuple2._2); System.out.println(message.key() + " : " + message.value()); //(1) producer.send(message).get(); //(2)}));(1) 行正確打印消息字符串。但是當(dāng)我使用 kafka 生產(chǎn)者發(fā)送這些消息(如(2)行)時(shí),它會(huì)拋出如下異常,我無(wú)法理解這個(gè)異常。我確認(rèn) kafaka 生產(chǎn)者消息是<String,String>通過(guò)第 (1) 行輸入的。但為什么第(2)行會(huì)拋出這個(gè)異常呢?我是否錯(cuò)過(guò)任何流程?
1 回答

動(dòng)漫人物
TA貢獻(xiàn)1815條經(jīng)驗(yàn) 獲得超10個(gè)贊
您需要為每個(gè) RDD 創(chuàng)建生產(chǎn)者。
RDD 分布在多個(gè)執(zhí)行器上,Producer 對(duì)象無(wú)法序列化以在它們之間共享
或者,查看結(jié)構(gòu)化流的文檔,您可以簡(jiǎn)單地執(zhí)行此操作以寫入主題;無(wú)需自己創(chuàng)建和發(fā)送記錄
stream.writeStream().format("kafka")...
請(qǐng)注意,如果目標(biāo)只是將一個(gè)主題映射到另一個(gè)主題,那么Kafka Streams API
比 Spark 更簡(jiǎn)單且開銷更少
添加回答
舉報(bào)
0/150
提交
取消