我正在嘗試使用 Golang 客戶端測試生產(chǎn)者將消息寫入 kafka 集群上的主題。package mainimport (? ? "fmt"? ? "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka")func main() {? ? p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers":"localhost"})? ? if err != nil {? ? ? ? panic(err)? ? }? ? defer p.Close()? ? // Delivery report handler for produced messages? ? go func() {? ? ? ? for e := range p.Events() {? ? ? ? ? ? switch ev := e.(type) {? ? ? ? ? ? case *kafka.Message:? ? ? ? ? ? ? ? if ev.TopicPartition.Error != nil {? ? ? ? ? ? ? ? ? ? fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)? ? ? ? ? ? ? ? } else {? ? ? ? ? ? ? ? ? ? fmt.Printf("Delivered message to %v\n", ev.TopicPartition)? ? ? ? ? ? ? ? }? ? ? ? ? ? }? ? ? ? }? ? }()? ? // Produce messages to topic (asynchronously)? ? topic := "test"? ? for _, word := range []string{"test message"} {? ? ? ? p.Produce(&kafka.Message{? ? ? ? ? ? TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},? ? ? ? ? ? Value:? ? ? ? ? []byte(word),? ? ? ? }, nil)? ? }? ? // Wait for message deliveries before shutting down? ? p.Flush(15 * 1000)}我在控制臺上收到消息-消費(fèi)者沒有問題。然后,我嘗試做同樣的事情,只是使用我的遠(yuǎn)程 kafka 集群主題(注意我也嘗試過不使用字符串中的端口):p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers":"HOSTNAME.amazonaws.com:9092,HOSTNAME2.amazonaws.com:9092,HOSTNAME3.amazonaws.com:9092"})它打印以下錯(cuò)誤:Delivery failed: test[0]@end(Broker: Not enough in-sync replicas)不過控制臺生產(chǎn)者沒有任何問題:./bin/kafka-console-producer.sh --broker-list HOSTNAME.amazonaws.com:9092,HOSTNAME2.amazonaws.com:9092,HOSTNAME3.amazonaws.com:9092 --topic test>proving that this works控制臺消費(fèi)者收到它:bin/kafka-console-consumer.sh --bootstrap-server HOSTNAME.amazonaws.com:9092,HOSTNAME2.amazonaws.com:9092,HOSTNAME3.amazonaws.com:9092 --topic test --from-beginning?proving that this works我做的最后一件事是檢查該主題有多少個(gè)同步副本。如果我沒看錯(cuò)的話,最小值應(yīng)該是 2,而且有 3 個(gè)。我還有什么可以研究的想法嗎?
2 回答

莫回?zé)o
TA貢獻(xiàn)1865條經(jīng)驗(yàn) 獲得超7個(gè)贊
您有min.insync.replicas=2
,但該主題只有一個(gè)副本。
我相信控制臺制作人只將該屬性設(shè)置為 1
有 3 個(gè)
其實(shí)只有一個(gè)。這是代理 ID 3。如果實(shí)際上有三個(gè)副本,您會看到總共三個(gè)單獨(dú)的數(shù)字作為 ISR

慕婉清6462132
TA貢獻(xiàn)1804條經(jīng)驗(yàn) 獲得超2個(gè)贊
或者,如果您使用 AWS 的 MSK,當(dāng)每個(gè)代理的 EBS 存儲完全用于其中一個(gè)代理時(shí),可能會出現(xiàn)這種情況,而可能的解決方法是增加其存儲。
- 2 回答
- 0 關(guān)注
- 195 瀏覽
添加回答
舉報(bào)
0/150
提交
取消