我在 python 中運行 confluent_kafka 客戶端。目前,我在嘗試生成然后消費消息時沒有收到任何錯誤,但問題是生產(chǎn)者說它成功了,但消費者找不到任何消息。我創(chuàng)建了一個主題,這是我構(gòu)建的正在使用的類:from confluent_kafka import Producer, Consumerfrom config import configimport jsonclass Kafka: """ Kafka Handler. """ def __init__(self, kafka_brokers_sasl, api_key): """ Arguments: kafka_brokers_sasl {str} -- String containing kafka brokers separated by comma (no spaces) api_key {str} -- Kafka Api Key """ self.driver_options = { 'bootstrap.servers': kafka_brokers_sasl, 'sasl.mechanisms': 'PLAIN', 'security.protocol': 'SASL_SSL', 'sasl.username': 'token', 'sasl.password': api_key, 'log.connection.close' : False, #'debug': 'all' } self.producer_options = { 'client.id': 'kafka-python-console-sample-producer' } self.producer_options.update(self.driver_options) self.consumer_options = { 'client.id': 'kafka-python-console-sample-consumer', 'group.id': 'kafka-python-console-sample-group' } self.consumer_options.update(self.driver_options) self.running = None def stop(self): self.running = False def delivery_report(self, err, msg): """ Called once for each message produced to indicate delivery result. Triggered by poll() or flush(). """ if err is not None: print('Message delivery failed: {}'.format(err)) else: print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) def produce(self, topic, data): # Function for producing/uploading data to a Kafka topic p = Producer(self.producer_options) print("Running?")
2 回答

揚帆大魚
TA貢獻1799條經(jīng)驗 獲得超9個贊
我不是 Python 方面的專家,但看起來您是在生成消息后才開始使用的?
kafka.produce(config['kafka']['topic'], json.dumps(mock))
kafka.consume(config['kafka']['topic'])
您需要在調(diào)用生產(chǎn)函數(shù)之前調(diào)用消耗函數(shù),因為當您啟動一個新消費者時,該消費者的默認偏移量將是最新的。因此,例如,如果您在偏移量 5 處生成了一條消息,然后啟動了一個新的消費者,則默認情況下,您的消費者偏移量將在偏移量 6 處,并且不會消耗在偏移量 5 處生成的消息。
解決方案是要么在產(chǎn)生任何東西之前開始消費,要么將消費者配置設(shè)置為從偏移量的開始消費消息。這可以通過設(shè)置auto.offset.reset
來完成,earliest
但我認為第一個解決方案更簡單。
添加回答
舉報
0/150
提交
取消