第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號(hào)安全,請(qǐng)及時(shí)綁定郵箱和手機(jī)立即綁定
已解決430363個(gè)問題,去搜搜看,總會(huì)有你想問的

有沒有辦法從 Kafka 主題獲取最后一條消息?

有沒有辦法從 Kafka 主題獲取最后一條消息?

瀟湘沐 2023-06-28 15:28:26
我有一個(gè)具有多個(gè)分區(qū)的 Kafka 主題,我想知道 Java 中是否有一種方法可以獲取該主題的最后一條消息。我不關(guān)心分區(qū),我只想獲取最新消息。我已經(jīng)嘗試過@KafkaListener,但它僅在主題更新時(shí)才獲取消息。如果應(yīng)用程序打開后沒有發(fā)布任何內(nèi)容,則不會(huì)返回任何內(nèi)容。也許傾聽者根本就不是解決問題的正確方法?
查看完整描述

2 回答

?
牛魔王的故事

TA貢獻(xiàn)1830條經(jīng)驗(yàn) 獲得超3個(gè)贊

以下片段對(duì)我有用。你可以試試這個(gè)。評(píng)論里有解釋。


        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        consumer.subscribe(Collections.singletonList(topic));


        consumer.poll(Duration.ofSeconds(10));


        consumer.assignment().forEach(System.out::println);


        AtomicLong maxTimestamp = new AtomicLong();

        AtomicReference<ConsumerRecord<String, String>> latestRecord = new AtomicReference<>();


        // get the last offsets for each partition

        consumer.endOffsets(consumer.assignment()).forEach((topicPartition, offset) -> {

            System.out.println("offset: "+offset);


            // seek to the last offset of each partition

            consumer.seek(topicPartition, (offset==0) ? offset:offset - 1);


            // poll to get the last record in each partition

            consumer.poll(Duration.ofSeconds(10)).forEach(record -> {


                // the latest record in the 'topic' is the one with the highest timestamp

                if (record.timestamp() > maxTimestamp.get()) {

                    maxTimestamp.set(record.timestamp());

                    latestRecord.set(record);

                }

            });

        });

        System.out.println(latestRecord.get());


查看完整回答
反對(duì) 回復(fù) 2023-06-28
?
森林海

TA貢獻(xiàn)2011條經(jīng)驗(yàn) 獲得超2個(gè)贊

您必須使用每個(gè)分區(qū)中的最新消息,然后在客戶端進(jìn)行比較(使用消息上的時(shí)間戳,如果包含它)。原因是 Kafka 不保證分區(qū)間的順序。在分區(qū)內(nèi),您可以確定偏移量最大的消息是最新推送到該分區(qū)的消息。



查看完整回答
反對(duì) 回復(fù) 2023-06-28
  • 2 回答
  • 0 關(guān)注
  • 394 瀏覽
慕課專欄
更多

添加回答

舉報(bào)

0/150
提交
取消
微信客服

購(gòu)課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動(dòng)學(xué)習(xí)伙伴

公眾號(hào)

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號(hào)