背景
在kafka的实际应用过程中,由于数据处理问题,需要对kafka中的数据进行重新消费。重新消费数据一般都是使用一个新的groupId,但默认的配置是earliest(当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 ),latest (当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 ).如果Kafka保存的数据量较小,通过earliest方式影响也不会太大,但当数据量比较大时,最好是能进行部分数据消费处理,一是提升处理问题的速度,二是减少资源浪费。本次先通过指定时间的方式来消费消息。
实现
Kafka 整个消息设计是非常的精妙,本案例是指定今天的凌晨为消息的开始时间
{
Properties props = new Properties();
props.put("bootstrap.servers", "");
props.put("group.id", "test2121");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
String topic = "";
// 获取topic的partition信息
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
List<TopicPartition> topicPartitions = new ArrayList<>();
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
long fetchDataTime = LocalDate.now().atStartOfDay(ZoneId.systemDefault()).toInstant().toEpochMilli();
for (PartitionInfo partitionInfo : partitionInfos) {
topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
timestampsToSearch.put(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()), fetchDataTime);
}
consumer.assign(topicPartitions);
// 获取每个partition偏移量
Map<TopicPartition, OffsetAndTimestamp> map = consumer.offsetsForTimes(timestampsToSearch);
OffsetAndTimestamp offsetTimestamp = null;
System.out.println("开始设置各分区初始偏移量...");
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : map.entrySet()) {
// 如果设置的查询偏移量的时间点大于最大的索引记录时间,那么value就为空
offsetTimestamp = entry.getValue();
if (offsetTimestamp != null) {
int partition = entry.getKey().partition();
long timestamp = offsetTimestamp.timestamp();
long offset = offsetTimestamp.offset();
// 设置读取消息的偏移量
consumer.seek(entry.getKey(), offset);
}
}
System.out.println("设置各分区初始偏移量结束...");
int count = 0;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
String value = record.value();
JSONObject jsonObject = JSON.parseObject(value);
}
}
}
代码分析
Kafka 的消费位置是通过offset控制,实现方案是先根据时间获取对应的offset位置,然后进行消费。如果现在需要查看某个分区,某个位置的数据应该如何实现呢?
实际上实现方式与上面类似, 下篇文章在贴代码
版权声明:本文为tianshishangxin1原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。