第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號(hào)安全,請(qǐng)及時(shí)綁定郵箱和手機(jī)立即綁定
已解決430363個(gè)問(wèn)題,去搜搜看,總會(huì)有你想問(wèn)的

Kafka 流聚合拋出異常

Kafka 流聚合拋出異常

天涯盡頭無(wú)女友 2024-01-05 14:48:53
這是我的 Kafka 流代碼,它使用滑動(dòng)窗口對(duì)時(shí)間窗口中的所有整數(shù)數(shù)據(jù)求和。public class KafkaWindowingLIS {    public static void main(String[] args) {        Properties config = new Properties();        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafkahost:9092");        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-windowing-lis");        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");        Integer uid = 1;        long tenSeconds = 1000 * 10;        StreamsBuilder builder = new StreamsBuilder();        KStream<String, String> dataStream = builder.stream("kafka-windowing-lis");        KStream<Integer, Integer> integerKStream = dataStream                .filter((key, val) -> {               //Filter only numbers from Stream                    try {                        Integer.parseInt(val);                        return true;                    } catch (NumberFormatException e) {                        return false;                    }                })                .map((key, val) -> new KeyValue<Integer, Integer>(uid, Integer.parseInt(val)));        TimeWindowedKStream<Integer, Integer> timeWindowedKStream = integerKStream                .groupBy((k, v) -> k, Serialized.with(Serdes.Integer(), Serdes.Integer()))     //Killed my time                .windowedBy(TimeWindows.of(tenSeconds));        timeWindowedKStream.aggregate(                () -> 0,                (key, value, aggregate) -> value + aggregate)                .toStream().print(Printed.toSysOut());        KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), config);//        kafkaStreams.cleanUp();        kafkaStreams.start();        Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));    }}
查看完整描述

1 回答

?
鳳凰求蠱

TA貢獻(xiàn)1825條經(jīng)驗(yàn) 獲得超4個(gè)贊

因?yàn)槟褂?code>aggregate(),所以您需要通過(guò)顯式設(shè)置輸出值 serde aggregate(..., Materialized.with(...))。輸出值類(lèi)型可能與輸入值類(lèi)型不同,因此輸入值 serde無(wú)法重用。(由于 Java 類(lèi)型擦除,Kafka Streams 不知道類(lèi)型實(shí)際上沒(méi)有改變......)因此,Kafka Streams 從配置中回退到默認(rèn)的 serde。

作為替代方案,您可以使用reduce()而不是aggregate解決問(wèn)題。輸出類(lèi)型reduce()與輸入類(lèi)型相同,因此輸入值 serde 可以用作輸出值。


查看完整回答
反對(duì) 回復(fù) 2024-01-05
  • 1 回答
  • 0 關(guān)注
  • 175 瀏覽
慕課專欄
更多

添加回答

舉報(bào)

0/150
提交
取消
微信客服

購(gòu)課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動(dòng)學(xué)習(xí)伙伴

公眾號(hào)

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號(hào)