Fetch Offset Range in Kafka with Kafka Client API
標(biāo)簽:
大數(shù)據(jù)
有的时候需要检出Kafka中某个topic的所有partition的offset range. 比如Spark Streaming在指定fromOffset时,如果不校验边界,可能会出错。Kafka提供了命令来check。这里提供一个基于Java API的方式
代码如下
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(config); consumer.subscribe(topics); ConsumerRecords<String, byte[]> records = consumer.poll(1000);return records.partitions().parallelStream().map(topicPartition -> { consumer.seekToBeginning(Collections.singletonList(topicPartition)); long offset = consumer.position(topicPartition); return new TopicPartitionInfo(topicPartition.topic(), topicPartition.partition(), offset); }).collect(Collectors.toList());
完整代码:See Here
依赖
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --><dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.0.0</version></dependency>
2018.01.09更新
上述代码中,如果poll(1000)
获得的records
并没有包含所有的partition的record,records.partitions()
所获取的并非为全部的该topic的partition。
即records.partitions()
只会返回这段records中所含有的partition。
因此,你可能需要
Map<TopicPartition, Long> fromOffsets = new HashMap<>();//do fill your fromOffsets with your own local offset-store hereKafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(kafkaParams); consumer.subscribe(topics); consumer.poll(100);for (TopicPartition topicPartition : fromOffsets.keySet()) { consumer.seekToBeginning(Collections.singletonList(topicPartition)); long offset = consumer.position(topicPartition); long consumedOffset = fromOffsets.getOrDefault(topicPartition, 0L); if (offset > consumedOffset) { log.warn("At partition {}, our system has consumed to {} but we can start only from {} because of retention expiration.", topicPartition.partition(), consumedOffset, offset); log.warn("At partition {}, start offset has been adjusted to {}", topicPartition.partition(), offset); fromOffsets.put(topicPartition, offset); } } consumer.unsubscribe();
作者:即墨灯火
链接:https://www.jianshu.com/p/1c6f1e79ac47
點(diǎn)擊查看更多內(nèi)容
為 TA 點(diǎn)贊
評(píng)論
評(píng)論
共同學(xué)習(xí),寫(xiě)下你的評(píng)論
評(píng)論加載中...
作者其他優(yōu)質(zhì)文章
正在加載中
感謝您的支持,我會(huì)繼續(xù)努力的~
掃碼打賞,你說(shuō)多少就多少
贊賞金額會(huì)直接到老師賬戶
支付方式
打開(kāi)微信掃一掃,即可進(jìn)行掃碼打賞哦