第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號(hào)安全,請(qǐng)及時(shí)綁定郵箱和手機(jī)立即綁定
已解決430363個(gè)問(wèn)題,去搜搜看,總會(huì)有你想問(wèn)的

使用 kafka 流根據(jù)消息密鑰將消息發(fā)送到主題

使用 kafka 流根據(jù)消息密鑰將消息發(fā)送到主題

躍然一笑 2023-08-04 15:23:09
我希望能夠根據(jù)消息鍵的鍵將 Kafkastream 中的所有記錄發(fā)送到不同的主題。前任。Kafka 中的流包含名稱作為鍵,記錄作為值。我想根據(jù)記錄的鍵將這些記錄分散到不同的主題數(shù)據(jù):(jhon -> {jhonsRecord}),(sean -> {seansRecord}),(mary -> {marysRecord}),(jhon -> {jhonsRecord2}),預(yù)期topic1 :名稱: jhon ->(jhon -> {jhonsRecord}),(jhon -> {jhonsRecord2})主題2:sean->(sean -> {seansRecord})主題3:瑪麗 ->(瑪麗 -> {marysRecord})下面是我現(xiàn)在執(zhí)行此操作的方式,但由于名稱列表是 hudge,所以速度很慢。另外,即使記錄了幾個(gè)名字,我也需要遍歷整個(gè)列表請(qǐng)?zhí)岢鲂迯?fù)建議 for( String name : names )     {         recordsByName.filterNot(( k, v ) -> k.equalsIgnoreCase(name)).to(name);     }
查看完整描述

3 回答

?
神不在的星期二

TA貢獻(xiàn)1963條經(jīng)驗(yàn) 獲得超6個(gè)贊

我認(rèn)為你應(yīng)該使用KStream::to(final TopicNameExtractor<K, V> topicExtractor)函數(shù)。它使您能夠計(jì)算每條消息的主題名稱。

示例代碼:

final KStream<String, String> stream = ???;
stream.to((key, value, recordContext) -> key);


查看完整回答
反對(duì) 回復(fù) 2023-08-04
?
墨色風(fēng)雨

TA貢獻(xiàn)1853條經(jīng)驗(yàn) 獲得超6個(gè)贊

如果您需要為每個(gè)用戶生成聚合數(shù)據(jù),則無(wú)需為每個(gè)用戶寫(xiě)入單獨(dú)的主題。您最好在源流上編寫(xiě)聚合。這樣,您就不會(huì)最終得到每個(gè)鍵一個(gè)主題,但您仍然可以獨(dú)立地對(duì)每個(gè)用戶運(yùn)行操作。

Serde<UserRecord>?recordSerde?=?...
KStream<Stream,?UserAggregate>?aggregateByName?=?recordsByName
???.groupByKey(Grouped.with(Serdes.String(),?recordSerde))
???.aggregate(...)
???.toStream()

這種方法將擴(kuò)展到數(shù)百萬(wàn)用戶,這是您目前無(wú)法通過(guò)每個(gè)用戶一個(gè)主題的方法實(shí)現(xiàn)的。


查看完整回答
反對(duì) 回復(fù) 2023-08-04
?
慕田峪9158850

TA貢獻(xiàn)1794條經(jīng)驗(yàn) 獲得超8個(gè)贊

我想你正在尋找的是KStream#branch。

以下未經(jīng)測(cè)試,但顯示了總體思路

// get a list of predicates to branch a topic on

final List<String> names = Arrays.asList("jhon", "sean", "mary");

final Predicate[] predicates = names.stream()

? ? .map((Function<String, Predicate<String, Object>>) n -> (s, o) -> s.equals(n))

? ? .toArray(Predicate[]::new);


// example input

final KStream<Object, Object> stream = new StreamsBuilder().stream("names");


// split the topic

KStream<String, Object>[] branches = stream.branch(predicates);

for (int i = 0; i < names.size(); i++) {

? ? branches[i].to(names.get(i));

}


// KStream branches[0] contains all records whose keys are "jhon"

// KStream branches[1] contains all records whose keys are "sean"

...


查看完整回答
反對(duì) 回復(fù) 2023-08-04
  • 3 回答
  • 0 關(guān)注
  • 201 瀏覽
慕課專欄
更多

添加回答

舉報(bào)

0/150
提交
取消
微信客服

購(gòu)課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動(dòng)學(xué)習(xí)伙伴

公眾號(hào)

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號(hào)