第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號安全,請及時(shí)綁定郵箱和手機(jī)立即綁定
已解決430363個(gè)問題,去搜搜看,總會有你想問的

如何使用kafka-python計(jì)算主題中的記錄(消息)數(shù)量

如何使用kafka-python計(jì)算主題中的記錄(消息)數(shù)量

楊魅力 2023-10-18 21:40:47
正如標(biāo)題中所說,我想在我的主題中獲得一些記錄,但我找不到使用 kafka-python 庫的解決方案。有人有什么主意嗎 ?
查看完整描述

4 回答

?
慕哥9229398

TA貢獻(xiàn)1877條經(jīng)驗(yàn) 獲得超6個(gè)贊

主要思想是計(jì)算主題的每個(gè)分區(qū)中有多少條消息并對所有這些數(shù)字求和。結(jié)果是有關(guān)該主題的消息總數(shù)。我使用confluence_kafka作為主庫。


from confluent_kafka import Consumer, TopicPartition

from concurrent.futures import ThreadPoolExecutor


consumer = Consumer({"bootstrap.servers": "localhost:6667", "group.id": "test"})


def get_partition_size(topic_name: str, partition_key: int):

    topic_partition = TopicPartition(topic_name, partition_key)

    low_offset, high_offset = consumer.get_watermark_offsets(topic_partition)

    partition_size = high_offset - low_offset

    return partition_size


def get_topic_size(topic_name: str):

    topic = consumer.list_topics(topic=topic_name)

    partitions = topic.topics[topic_name].partitions

    workers, max_workers = [], len(partitions) or 1


    with ThreadPoolExecutor(max_workers=max_workers) as e:

        for partition_key in list(topic.topics[topic_name].partitions.keys()):

            job = e.submit(get_partition_size, topic_name, partition_key)

            workers.append(job)


    topic_size = sum([w.result() for w in workers])

    return topic_size


print(get_topic_size('my.kafka.topic'))


查看完整回答
反對 回復(fù) 2023-10-18
?
大話西游666

TA貢獻(xiàn)1817條經(jīng)驗(yàn) 獲得超14個(gè)贊

一種解決方案是您可以向所有分區(qū)各添加一條消息并獲取最后的偏移量。根據(jù)偏移量,您可以計(jì)算到目前為止發(fā)送到主題的消息總數(shù)。

但這不是正確的做法。你不知道消費(fèi)者已經(jīng)消費(fèi)了多少條消息,以及kafka刪除了多少條消息。唯一的方法是您可以消費(fèi)消息并計(jì)算數(shù)量。


查看完整回答
反對 回復(fù) 2023-10-18
?
慕虎7371278

TA貢獻(xiàn)1802條經(jīng)驗(yàn) 獲得超4個(gè)贊

沒有特定的 API 來計(jì)算某個(gè)主題的記錄數(shù)。您需要消費(fèi)并計(jì)算從 kafka 消費(fèi)者收到的記錄數(shù)。



查看完整回答
反對 回復(fù) 2023-10-18
?
富國滬深

TA貢獻(xiàn)1790條經(jīng)驗(yàn) 獲得超9個(gè)贊

我無法使用 來實(shí)現(xiàn)此操作kafka-python,但我可以使用confluent-kafka庫相當(dāng)輕松地完成此操作:


from confluent_kafka import Consumer


topic = "test_topic"

broker = "localhost:9092"


def get_count():

    consumer = Consumer({

        'bootstrap.servers': broker,

        'group.id': 'my-group',

        'auto.offset.reset': 'earliest',

    })


    consumer.subscribe([topic])


    total_message_count = 0

    while True:

        msg = consumer.poll(1.0)


        if msg is None:

            print("No more messages")

            break

        if msg.error():

            print("Consumer error: {}".format(msg.error()))

            continue


        total_message_count = total_message_count + 1

        print('Received message {}: {}'.format(total_message_count,     

msg.value().decode('utf-8')))


    consumer.close()


    print(total_message_count)


查看完整回答
反對 回復(fù) 2023-10-18
  • 4 回答
  • 0 關(guān)注
  • 238 瀏覽
慕課專欄
更多

添加回答

舉報(bào)

0/150
提交
取消
微信客服

購課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動(dòng)學(xué)習(xí)伙伴

公眾號

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號