天涯盡頭無(wú)女友
2024-01-05 14:48:53
這是我的 Kafka 流代碼,它使用滑動(dòng)窗口對(duì)時(shí)間窗口中的所有整數(shù)數(shù)據(jù)求和。public class KafkaWindowingLIS { public static void main(String[] args) { Properties config = new Properties(); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafkahost:9092"); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-windowing-lis"); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); Integer uid = 1; long tenSeconds = 1000 * 10; StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> dataStream = builder.stream("kafka-windowing-lis"); KStream<Integer, Integer> integerKStream = dataStream .filter((key, val) -> { //Filter only numbers from Stream try { Integer.parseInt(val); return true; } catch (NumberFormatException e) { return false; } }) .map((key, val) -> new KeyValue<Integer, Integer>(uid, Integer.parseInt(val))); TimeWindowedKStream<Integer, Integer> timeWindowedKStream = integerKStream .groupBy((k, v) -> k, Serialized.with(Serdes.Integer(), Serdes.Integer())) //Killed my time .windowedBy(TimeWindows.of(tenSeconds)); timeWindowedKStream.aggregate( () -> 0, (key, value, aggregate) -> value + aggregate) .toStream().print(Printed.toSysOut()); KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), config);// kafkaStreams.cleanUp(); kafkaStreams.start(); Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close)); }}
1 回答

鳳凰求蠱
TA貢獻(xiàn)1825條經(jīng)驗(yàn) 獲得超4個(gè)贊
因?yàn)槟褂?code>aggregate(),所以您需要通過(guò)顯式設(shè)置輸出值 serde aggregate(..., Materialized.with(...))
。輸出值類(lèi)型可能與輸入值類(lèi)型不同,因此輸入值 serde無(wú)法重用。(由于 Java 類(lèi)型擦除,Kafka Streams 不知道類(lèi)型實(shí)際上沒(méi)有改變......)因此,Kafka Streams 從配置中回退到默認(rèn)的 serde。
作為替代方案,您可以使用reduce()
而不是aggregate
解決問(wèn)題。輸出類(lèi)型reduce()
與輸入類(lèi)型相同,因此輸入值 serde 可以用作輸出值。
添加回答
舉報(bào)
0/150
提交
取消