3 回答

TA貢獻(xiàn)1818條經(jīng)驗(yàn) 獲得超11個(gè)贊
您是否嘗試過使用regular expressions.
例子 :
consume, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "server",
})
err = consume.SubscribeTopics([]string{"^.*_mypattern"}, nil)
來源:https ://github.com/confluentinc/confluent-kafka-go/issues/96
在初始化 consumer 時(shí)也嘗試設(shè)置此選項(xiàng)metadata.max.age.ms。這將刷新元數(shù)據(jù)以查看是否有任何新主題可用。

TA貢獻(xiàn)1827條經(jīng)驗(yàn) 獲得超4個(gè)贊
該邏輯的代碼片段會(huì)有所幫助。
您可以使用Mongo Change Streams來做到這一點(diǎn)。
例如,要查看集合的更改,請(qǐng)使用以下Collection.Watch()方法 -
var collection *mongo.Collection
// specify a pipeline that will only match "insert" events
// specify the MaxAwaitTimeOption to have each attempt wait two seconds for new documents
matchStage := bson.D{{"$match", bson.D{{"operationType", "insert"}}}}
opts := options.ChangeStream().SetMaxAwaitTime(2 * time.Second)
changeStream, err := collection.Watch(context.TODO(), mongo.Pipeline{matchStage}, opts)
if err != nil {
log.Fatal(err)
}
// print out all change stream events in the order they're received
// see the mongo.ChangeStream documentation for more examples of using change streams
for changeStream.Next(context.TODO()) {
fmt.Println(changeStream.Current)
// NewConsumer
}
然后創(chuàng)建一個(gè)新的消費(fèi)者或者.SubscribeTopics()在你更新你的集合并且它符合你的標(biāo)準(zhǔn)時(shí)調(diào)用

TA貢獻(xiàn)1851條經(jīng)驗(yàn) 獲得超5個(gè)贊
如果需要,消費(fèi)者可以使用來自動(dòng)態(tài)主題的消息。我認(rèn)為您可能使用 Redis PubSub 而不是 Kafka。
因?yàn)楫?dāng)您需要從最近創(chuàng)建的主題中消費(fèi)時(shí),消費(fèi)者必須重新連接到代理,并且在頻繁添加新主題時(shí)成本很高。
我假設(shè)新主題描述了一個(gè)聊天室/組。如果正確,Redis PubSub Subscription 比 Kafka Consumer 輕。您可以將頻道用作聊天室/群組。
或者您可以同時(shí)使用 Kafka 和 Redis PubSub,在從 Kafka 消費(fèi)創(chuàng)建的房間/組事件后,將其設(shè)置為 Redis PubSub 的頻道,您就可以開始訂閱了。
- 3 回答
- 0 關(guān)注
- 232 瀏覽
添加回答
舉報(bào)