2 回答

TA貢獻(xiàn)1830條經(jīng)驗(yàn) 獲得超3個(gè)贊
以下片段對(duì)我有用。你可以試試這個(gè)。評(píng)論里有解釋。
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(topic));
consumer.poll(Duration.ofSeconds(10));
consumer.assignment().forEach(System.out::println);
AtomicLong maxTimestamp = new AtomicLong();
AtomicReference<ConsumerRecord<String, String>> latestRecord = new AtomicReference<>();
// get the last offsets for each partition
consumer.endOffsets(consumer.assignment()).forEach((topicPartition, offset) -> {
System.out.println("offset: "+offset);
// seek to the last offset of each partition
consumer.seek(topicPartition, (offset==0) ? offset:offset - 1);
// poll to get the last record in each partition
consumer.poll(Duration.ofSeconds(10)).forEach(record -> {
// the latest record in the 'topic' is the one with the highest timestamp
if (record.timestamp() > maxTimestamp.get()) {
maxTimestamp.set(record.timestamp());
latestRecord.set(record);
}
});
});
System.out.println(latestRecord.get());

TA貢獻(xiàn)2011條經(jīng)驗(yàn) 獲得超2個(gè)贊
您必須使用每個(gè)分區(qū)中的最新消息,然后在客戶端進(jìn)行比較(使用消息上的時(shí)間戳,如果包含它)。原因是 Kafka 不保證分區(qū)間的順序。在分區(qū)內(nèi),您可以確定偏移量最大的消息是最新推送到該分區(qū)的消息。
添加回答
舉報(bào)