第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

Python Kafka 客戶端 - 沒有錯誤但無法正常工作

Python Kafka 客戶端 - 沒有錯誤但無法正常工作

桃花長相依 2021-07-09 14:48:42
我在 python 中運行 confluent_kafka 客戶端。目前,我在嘗試生成然后消費消息時沒有收到任何錯誤,但問題是生產(chǎn)者說它成功了,但消費者找不到任何消息。我創(chuàng)建了一個主題,這是我構(gòu)建的正在使用的類:from confluent_kafka import Producer, Consumerfrom config import configimport jsonclass Kafka:    """    Kafka Handler.    """    def __init__(self, kafka_brokers_sasl, api_key):        """        Arguments:            kafka_brokers_sasl {str} -- String containing kafka brokers separated by comma (no spaces)            api_key {str} -- Kafka Api Key        """        self.driver_options = {            'bootstrap.servers': kafka_brokers_sasl,            'sasl.mechanisms': 'PLAIN',            'security.protocol': 'SASL_SSL',            'sasl.username': 'token',            'sasl.password': api_key,            'log.connection.close' : False,            #'debug': 'all'        }        self.producer_options = {            'client.id': 'kafka-python-console-sample-producer'        }        self.producer_options.update(self.driver_options)        self.consumer_options = {            'client.id': 'kafka-python-console-sample-consumer',            'group.id': 'kafka-python-console-sample-group'        }        self.consumer_options.update(self.driver_options)        self.running = None    def stop(self):        self.running = False    def delivery_report(self, err, msg):        """ Called once for each message produced to indicate delivery result.            Triggered by poll() or flush(). """        if err is not None:            print('Message delivery failed: {}'.format(err))        else:            print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))    def produce(self, topic, data): # Function for producing/uploading data to a Kafka topic        p = Producer(self.producer_options)        print("Running?")
查看完整描述

2 回答

?
揚帆大魚

TA貢獻1799條經(jīng)驗 獲得超9個贊

我不是 Python 方面的專家,但看起來您是在生成消息后才開始使用的?

kafka.produce(config['kafka']['topic'], json.dumps(mock)) kafka.consume(config['kafka']['topic'])

您需要在調(diào)用生產(chǎn)函數(shù)之前調(diào)用消耗函數(shù),因為當您啟動一個新消費者時,該消費者的默認偏移量將是最新的。因此,例如,如果您在偏移量 5 處生成了一條消息,然后啟動了一個新的消費者,則默認情況下,您的消費者偏移量將在偏移量 6 處,并且不會消耗在偏移量 5 處生成的消息。

解決方案是要么在產(chǎn)生任何東西之前開始消費,要么將消費者配置設(shè)置為從偏移量的開始消費消息。這可以通過設(shè)置auto.offset.reset來完成,earliest但我認為第一個解決方案更簡單。


查看完整回答
反對 回復(fù) 2021-07-13
  • 2 回答
  • 0 關(guān)注
  • 573 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動學(xué)習(xí)伙伴

公眾號

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號