第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號(hào)安全,請(qǐng)及時(shí)綁定郵箱和手機(jī)立即綁定

Fetch Offset Range in Kafka with Kafka Client API

標(biāo)簽:
大數(shù)據(jù)

有的时候需要检出Kafka中某个topic的所有partition的offset range. 比如Spark Streaming在指定fromOffset时,如果不校验边界,可能会出错。Kafka提供了命令来check。这里提供一个基于Java API的方式

代码如下

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(config);
consumer.subscribe(topics);
ConsumerRecords<String, byte[]> records = consumer.poll(1000);return records.partitions().parallelStream().map(topicPartition -> {
        consumer.seekToBeginning(Collections.singletonList(topicPartition));        long offset = consumer.position(topicPartition);        return new TopicPartitionInfo(topicPartition.topic(), topicPartition.partition(), offset);
}).collect(Collectors.toList());

完整代码:See Here

依赖

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --><dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.0.0</version></dependency>

2018.01.09更新

上述代码中,如果poll(1000)获得的records并没有包含所有的partition的record,records.partitions()所获取的并非为全部的该topic的partition。

records.partitions()只会返回这段records中所含有的partition。

因此,你可能需要

Map<TopicPartition, Long> fromOffsets = new HashMap<>();//do fill your fromOffsets with your own local offset-store hereKafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(kafkaParams);
consumer.subscribe(topics);
consumer.poll(100);for (TopicPartition topicPartition : fromOffsets.keySet()) {
    consumer.seekToBeginning(Collections.singletonList(topicPartition));    long offset = consumer.position(topicPartition);    long consumedOffset = fromOffsets.getOrDefault(topicPartition, 0L);    if (offset > consumedOffset) {        log.warn("At partition {}, our system has consumed to {} but we can start only from {} because of retention expiration.", topicPartition.partition(), consumedOffset, offset);        log.warn("At partition {}, start offset has been adjusted to {}", topicPartition.partition(), offset);
        fromOffsets.put(topicPartition, offset);
    }
}
consumer.unsubscribe();



作者:即墨灯火
链接:https://www.jianshu.com/p/1c6f1e79ac47


點(diǎn)擊查看更多內(nèi)容
TA 點(diǎn)贊

若覺(jué)得本文不錯(cuò),就分享一下吧!

評(píng)論

作者其他優(yōu)質(zhì)文章

正在加載中
  • 推薦
  • 評(píng)論
  • 收藏
  • 共同學(xué)習(xí),寫(xiě)下你的評(píng)論
感謝您的支持,我會(huì)繼續(xù)努力的~
掃碼打賞,你說(shuō)多少就多少
贊賞金額會(huì)直接到老師賬戶
支付方式
打開(kāi)微信掃一掃,即可進(jìn)行掃碼打賞哦
今天注冊(cè)有機(jī)會(huì)得

100積分直接送

付費(fèi)專欄免費(fèi)學(xué)

大額優(yōu)惠券免費(fèi)領(lǐng)

立即參與 放棄機(jī)會(huì)
微信客服

購(gòu)課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動(dòng)學(xué)習(xí)伙伴

公眾號(hào)

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號(hào)

舉報(bào)

0/150
提交
取消