嘗試使用 confluent_kafka.AvroConsumer 來消費來自給定時間戳的消息。if flag: # creating a list topic_partitons_to_search = list( map(lambda p: TopicPartition('my_topic2', p, int(time.time())), range(0, 1))) print("Searching for offsets with %s" % topic_partitons_to_search) offsets = c.offsets_for_times(topic_partitons_to_search, timeout=1.0) print("offsets_for_times results: %s" % offsets) for x in offsets: c.seek(x) flag=False 控制臺返回這個Searching for offsets with [TopicPartition{topic=my_topic2,partition=0,offset=1543584425,error=None}]offsets_for_times results: [TopicPartition{topic=my_topic2,partition=0,offset=0,error=None}]{'name': 'Hello'}{'name': 'Hello'}{'name': 'Hello1'}{'name': 'Hello3'}{'name': 'Hello3'}{'name': 'Hello3'}{'name': 'Hello3'}{'name': 'Hello3'}{'name': 'Offset 8'}{'name': 'Offset 9'}{'name': 'Offset 10'}{'name': 'Offset 11'}{'name': 'New'} 這些是我在 my_topic2 的分區(qū) 0 中的所有消息(分區(qū) 1 中沒有任何消息),我們應(yīng)該什么也得不到,因為我們沒有從當(dāng)前時間(time.time())生成的消息。然后我希望能夠使用類似的東西time.time() - 60000來獲取過去 60000 毫秒內(nèi)的所有消息
1 回答

寶慕林4294392
TA貢獻2021條經(jīng)驗 獲得超8個贊
Pythons time.time() 返回自紀(jì)元以來的秒數(shù),offsets_for_times 使用紀(jì)元的毫秒數(shù),所以當(dāng)我發(fā)送秒數(shù)時,它計算的日期比今天早得多,這意味著我們應(yīng)該包括所有我的抵消。
添加回答
舉報
0/150
提交
取消