4 回答

TA貢獻(xiàn)1877條經(jīng)驗(yàn) 獲得超6個(gè)贊
主要思想是計(jì)算主題的每個(gè)分區(qū)中有多少條消息并對所有這些數(shù)字求和。結(jié)果是有關(guān)該主題的消息總數(shù)。我使用confluence_kafka作為主庫。
from confluent_kafka import Consumer, TopicPartition
from concurrent.futures import ThreadPoolExecutor
consumer = Consumer({"bootstrap.servers": "localhost:6667", "group.id": "test"})
def get_partition_size(topic_name: str, partition_key: int):
topic_partition = TopicPartition(topic_name, partition_key)
low_offset, high_offset = consumer.get_watermark_offsets(topic_partition)
partition_size = high_offset - low_offset
return partition_size
def get_topic_size(topic_name: str):
topic = consumer.list_topics(topic=topic_name)
partitions = topic.topics[topic_name].partitions
workers, max_workers = [], len(partitions) or 1
with ThreadPoolExecutor(max_workers=max_workers) as e:
for partition_key in list(topic.topics[topic_name].partitions.keys()):
job = e.submit(get_partition_size, topic_name, partition_key)
workers.append(job)
topic_size = sum([w.result() for w in workers])
return topic_size
print(get_topic_size('my.kafka.topic'))

TA貢獻(xiàn)1817條經(jīng)驗(yàn) 獲得超14個(gè)贊
一種解決方案是您可以向所有分區(qū)各添加一條消息并獲取最后的偏移量。根據(jù)偏移量,您可以計(jì)算到目前為止發(fā)送到主題的消息總數(shù)。
但這不是正確的做法。你不知道消費(fèi)者已經(jīng)消費(fèi)了多少條消息,以及kafka刪除了多少條消息。唯一的方法是您可以消費(fèi)消息并計(jì)算數(shù)量。

TA貢獻(xiàn)1802條經(jīng)驗(yàn) 獲得超4個(gè)贊
沒有特定的 API 來計(jì)算某個(gè)主題的記錄數(shù)。您需要消費(fèi)并計(jì)算從 kafka 消費(fèi)者收到的記錄數(shù)。

TA貢獻(xiàn)1790條經(jīng)驗(yàn) 獲得超9個(gè)贊
我無法使用 來實(shí)現(xiàn)此操作kafka-python,但我可以使用confluent-kafka庫相當(dāng)輕松地完成此操作:
from confluent_kafka import Consumer
topic = "test_topic"
broker = "localhost:9092"
def get_count():
consumer = Consumer({
'bootstrap.servers': broker,
'group.id': 'my-group',
'auto.offset.reset': 'earliest',
})
consumer.subscribe([topic])
total_message_count = 0
while True:
msg = consumer.poll(1.0)
if msg is None:
print("No more messages")
break
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
total_message_count = total_message_count + 1
print('Received message {}: {}'.format(total_message_count,
msg.value().decode('utf-8')))
consumer.close()
print(total_message_count)
添加回答
舉報(bào)