3 回答

TA貢獻(xiàn)1963條經(jīng)驗(yàn) 獲得超6個(gè)贊
我認(rèn)為你應(yīng)該使用KStream::to(final TopicNameExtractor<K, V> topicExtractor)
函數(shù)。它使您能夠計(jì)算每條消息的主題名稱。
示例代碼:
final KStream<String, String> stream = ???; stream.to((key, value, recordContext) -> key);

TA貢獻(xiàn)1853條經(jīng)驗(yàn) 獲得超6個(gè)贊
如果您需要為每個(gè)用戶生成聚合數(shù)據(jù),則無(wú)需為每個(gè)用戶寫(xiě)入單獨(dú)的主題。您最好在源流上編寫(xiě)聚合。這樣,您就不會(huì)最終得到每個(gè)鍵一個(gè)主題,但您仍然可以獨(dú)立地對(duì)每個(gè)用戶運(yùn)行操作。
Serde<UserRecord>?recordSerde?=?... KStream<Stream,?UserAggregate>?aggregateByName?=?recordsByName ???.groupByKey(Grouped.with(Serdes.String(),?recordSerde)) ???.aggregate(...) ???.toStream()
這種方法將擴(kuò)展到數(shù)百萬(wàn)用戶,這是您目前無(wú)法通過(guò)每個(gè)用戶一個(gè)主題的方法實(shí)現(xiàn)的。

TA貢獻(xiàn)1794條經(jīng)驗(yàn) 獲得超8個(gè)贊
我想你正在尋找的是KStream#branch
。
以下未經(jīng)測(cè)試,但顯示了總體思路
// get a list of predicates to branch a topic on
final List<String> names = Arrays.asList("jhon", "sean", "mary");
final Predicate[] predicates = names.stream()
? ? .map((Function<String, Predicate<String, Object>>) n -> (s, o) -> s.equals(n))
? ? .toArray(Predicate[]::new);
// example input
final KStream<Object, Object> stream = new StreamsBuilder().stream("names");
// split the topic
KStream<String, Object>[] branches = stream.branch(predicates);
for (int i = 0; i < names.size(); i++) {
? ? branches[i].to(names.get(i));
}
// KStream branches[0] contains all records whose keys are "jhon"
// KStream branches[1] contains all records whose keys are "sean"
...
添加回答
舉報(bào)