第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號(hào)安全,請(qǐng)及時(shí)綁定郵箱和手機(jī)立即綁定
已解決430363個(gè)問題,去搜搜看,總會(huì)有你想問的

如何從頭訂閱

如何從頭訂閱

Go
ITMISS 2023-06-12 09:33:26
我正在嘗試使用 GroupId 編寫一個(gè) Kafka Consumer foo,它訂閱某個(gè)主題并從頭開始讀?。词怪坝衅屏浚N覈L試與重新平衡回調(diào)一起使用Subscribe,但它似乎從未被調(diào)用(已設(shè)置設(shè)置go.application)。有什么例子可以使這項(xiàng)工作成功嗎?
查看完整描述

2 回答

?
翻過高山走不出你

TA貢獻(xiàn)1875條經(jīng)驗(yàn) 獲得超3個(gè)贊

你可能只需要將你的值設(shè)置?auto.offset.resetkafka.OffsetBeginning.String()

package main


/**

?* Copyright 2016 Confluent Inc.

?*/


// consumer_example implements a consumer using the non-channel Poll() API

// to retrieve messages and events.


import (

? ? "fmt"

? ? "github.com/confluentinc/confluent-kafka-go/kafka"

? ? "os"

? ? "os/signal"

? ? "syscall"

)


func main() {


? ? broker := "YOUR_BROKER"

? ? group := "YOUR_GROUP"

? ? topics := "YOUR_TOPICS"

? ? sigchan := make(chan os.Signal, 1)

? ? signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)


? ? c, err := kafka.NewConsumer(&kafka.ConfigMap{

? ? ? ? "bootstrap.servers":? broker,

? ? ? ? "group.id":? ? ? ? ? ?group,

? ? ? ? "session.timeout.ms": 6000,

? ? ? ? "auto.offset.reset":? kafka.OffsetBeginning.String()})


? ? if err != nil {

? ? ? ? fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)

? ? ? ? os.Exit(1)

? ? }


? ? fmt.Printf("Created Consumer %v\n", c)


? ? err = c.SubscribeTopics(topics, nil)


? ? run := true


? ? for run == true {

? ? ? ? select {

? ? ? ? case sig := <-sigchan:

? ? ? ? ? ? fmt.Printf("Caught signal %v: terminating\n", sig)

? ? ? ? ? ? run = false

? ? ? ? default:

? ? ? ? ? ? ev := c.Poll(100)

? ? ? ? ? ? if ev == nil {

? ? ? ? ? ? ? ? continue

? ? ? ? ? ? }


? ? ? ? ? ? switch e := ev.(type) {

? ? ? ? ? ? case *kafka.Message:

? ? ? ? ? ? ? ? fmt.Printf("%% Message on %s:\n%s\n",

? ? ? ? ? ? ? ? ? ? e.TopicPartition, string(e.Value))

? ? ? ? ? ? ? ? if e.Headers != nil {

? ? ? ? ? ? ? ? ? ? fmt.Printf("%% Headers: %v\n", e.Headers)

? ? ? ? ? ? ? ? }

? ? ? ? ? ? case kafka.Error:

? ? ? ? ? ? ? ? // Errors should generally be considered as informational, the client will try to automatically recover

? ? ? ? ? ? ? ? fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)

? ? ? ? ? ? default:

? ? ? ? ? ? ? ? fmt.Printf("Ignored %v\n", e)

? ? ? ? ? ? }

? ? ? ? }

? ? }


? ? fmt.Printf("Closing consumer\n")

? ? c.Close()

}


查看完整回答
反對(duì) 回復(fù) 2023-06-12
?
慕碼人2483693

TA貢獻(xiàn)1860條經(jīng)驗(yàn) 獲得超9個(gè)贊

我們現(xiàn)在設(shè)置enable.auto.commitfalse. 這樣,就不會(huì)存儲(chǔ)偏移量,我們每次運(yùn)行都從頭開始消費(fèi)。



查看完整回答
反對(duì) 回復(fù) 2023-06-12
  • 2 回答
  • 0 關(guān)注
  • 203 瀏覽

添加回答

舉報(bào)

0/150
提交
取消
微信客服

購課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動(dòng)學(xué)習(xí)伙伴

公眾號(hào)

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號(hào)