我試圖在 Kafka Streams 的幫助下實現(xiàn)以下邏輯:聽一些來自主題的參考數(shù)據(jù),例如。ref-data-topic并StateStore從中創(chuàng)建一個全局。收聽來自另一個主題的消息,data-topic這些消息必須根據(jù) ref 數(shù)據(jù)進行驗證并發(fā)送到success或errors主題。下面是示例偽代碼:class SomeProcessor implements Processor<String, String> { private KeyValueStore<String, String> refDataStore; @Override public void init(final ProcessorContext context) { refDataStore = (KeyValueStore) context.getStateStore("ref-data-store"); } @Override public void process(String key String value) { Object refData = refDataStore.get("some_key"); // business logic here if(ok) { sendValueToTopic("success"); } else { sendValueToTopic("errors"); } }}或者實現(xiàn)這種理想行為的規(guī)范方法是什么?就像我現(xiàn)在想到的另一種方法是用驗證信息豐富處理器中的數(shù)據(jù),然后將所有內(nèi)容發(fā)送到一個主題中,讓客戶端處理例如validationStatus接收到的消息。雖然,我真的很想有一個包含兩個主題的解決方案,因為例如,在這種情況下,我可以使用 Kafka Connectsuccess topic直接鏈接到某個數(shù)據(jù)存儲并以error topic某種方式進行處理。同樣,在只有一個主題的方法中,我不知道如何實現(xiàn)這個“store_only_successfully_validated_entities”用例。有什么想法和建議嗎?
添加回答
舉報
0/150
提交
取消