我想做的是:使用數(shù)字主題(Long‘s)中的記錄匯總(計(jì)數(shù))每5秒窗口的值將最終聚合結(jié)果發(fā)送到另一個(gè)主題我的代碼看起來如下:KStream<String, Long> longs = builder.stream(
Serdes.String(), Serdes.Long(), "longs");
// In one ktable, count by key, on a five second tumbling window.
KTable<Windowed<String>, Long> longCounts =
longs.countByKey(TimeWindows.of("longCounts", 5000L));
// Finally, sink to the long-avgs topic.
longCounts.toStream((wk, v) -> wk.key())
.to("long-counts");看起來一切都像預(yù)期的那樣工作,但是聚合被發(fā)送到每個(gè)傳入記錄的目標(biāo)主題。我的問題是如何只發(fā)送每個(gè)窗口的最終聚合結(jié)果?
如何發(fā)送時(shí)間窗口KTable的最終Kafka流聚合結(jié)果?
慕神8447489
2019-07-13 15:03:38