Kafka的Offset位移
1. Offset的默认维护位置
__consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.id+topic+分区号,value 就是当前 offset 的值。每隔一段时间kafka 内部会对这个Topic进行compact,也就是每个 group.id+topic+分区号就保留最新数据。
2. 查看Kafka的Offset
_consumer_offsets
为Kafka 中的 topic,那就可以通过消费者进行消费查看。
2.1 修改Kafka配置
默认Kafka不对外提供系统主题查看,为了查看系统主题数据,需要修改config/consumer.properties
文件做以下配置:
[jack@hadoop107 kafka-3.6.1]$ vi config/consumer.properties
## 末尾加上一行,默认值为true
exclude.internal.topics=false
[jack@hadoop107 kafka-3.6.1]$ scp config/consumer.properties jack@192.168.101.105:/opt/module/kafka-3.6.1/config/
[jack@hadoop107 kafka-3.6.1]$ scp config/consumer.properties jack@192.168.101.106:/opt/module/kafka-3.6.1/config/
2.2 查看消费者消费主题__consumer_offsets
[jack@hadoop107 kafka-3.6.1]$ bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server hadoop105:9092 --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
.....
[test888,testtopic,0]::OffsetAndMetadata(offset=2973, leaderEpoch=Optional[4], metadata=, commitTimestamp=1710754619635, expireTimestamp=None)
[test888,testtopic,5]::OffsetAndMetadata(offset=1222, leaderEpoch=Optional[0], metadata=, commitTimestamp=1710754619635, expireTimestamp=None)
[test888,testtopic,2]::OffsetAndMetadata(offset=2604, leaderEpoch=Optional[5], metadata=, commitTimestamp=1710754619635, expireTimestamp=None)
[test888,testtopic,1]::OffsetAndMetadata(offset=2940, leaderEpoch=Optional[6], metadata=, commitTimestamp=1710754619635, expireTimestamp=None)
[test888,testtopic,6]::OffsetAndMetadata(offset=936, leaderEpoch=Optional[0], metadata=, commitTimestamp=1710754624583, expireTimestamp=None)
.....
__consumer_offsets的消费情况在末尾仍然不断打印,可以看到之前历史消费的消息记录。
3. 自动提交Offset
为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能。 自动提交offset的相关参数:
- enable.auto.commit:是否开启自动提交offset功能,默认是true
- auto.commit.interval.ms:自动提交offset的时间间隔,默认是5s
3.1 消费者代码应用自动提交Offset
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.105:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StickyAssignor.class.getName());
// 配置消费者组(组名任意起名)
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test999");
// 是否自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); // 默认就是true
// 提交 offset 的时间周期 1000ms,默认 5s
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);// 设置1s
// 创建消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
List<String> topics = Collections.singletonList("testtopic");
kafkaConsumer.subscribe(topics);
// 拉取数据打印
while (true){
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(2));
for (ConsumerRecord<String, String> msg : consumerRecords) {
System.out.println("偏移量: "+ msg.offset()+"\t分区:"+msg.partition()+"\t消息key: "+msg.key()+"\t消息内容: "+msg.value());
}
}
}
4. 手动提交Offset
虽然自动提交Offset十分简单便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因此Kafka还提供了手动提交Offset的API。
手动提交Offset的方法有两种:分别是commitSync
(同步提交)和commitAsync
(异步提交)。
两者的相同点是,都会将本次提交的一批数据最高的偏移量提交;
不同点是,同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败。
- commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。
- commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。
4.1 同步提交Offset
由于同步提交Offset有失败重试机制,故更加可靠,但是由于一直等待提交结果,提交的效率比较低。
4.2 同步提交代码示例
public class CustomConsumerByHandSync {
public static void main(String[] args) {
// 1. 创建 kafka 消费者配置类
Properties properties = new Properties();
// 2. 添加配置参数
// 添加连接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
// 配置序列化 必须
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// 配置消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
// 是否自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
//3. 创建 kafka 消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
//4. 设置消费主题 形参是列表
consumer.subscribe(Arrays.asList("first"));
//5. 消费数据
while (true){
// 读取消息
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
// 输出消息
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.value());
}
// 同步提交 offset
consumer.commitSync();
}
}
}
4.3 异步提交Offset
虽然同步提交Offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交Offset的方式。
4.4 异步提交Offset的示例
public class CustomConsumerByHandAsync {
public static void main(String[] args) {
// 1. 创建 kafka 消费者配置类
Properties properties = new Properties();
// 2. 添加配置参数
// 添加连接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
// 配置序列化 必须
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// 配置消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
// 是否自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
//3. 创建 Kafka 消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
//4. 设置消费主题 形参是列表
consumer.subscribe(Arrays.asList("first"));
//5. 消费数据
while (true){
// 读取消息
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
// 输出消息
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.value());
}
// 异步提交 offset
consumer.commitAsync();
}
}
}
5. 指定Offset消费
配置项auto.offset.reset=earliest|latest|none
, 默认是 latest。
- earliest:自动将偏移量重置为最早的偏移量,和
kafka-console-consumer.sh
命令行客户端参数--from-beginning
语义相同。 - latest(默认值):自动将偏移量重置为最新偏移量。
- none:如果未找到消费者组的先前偏移量,则向消费者抛出异常。
5.1 任意指定Offset位移开始消费
public static void main(String[] args) {
// 0 配置信息
Properties properties = new Properties();
// 连接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop105:9092");
// key value 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test8989");
// 1 创建一个消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 2 订阅一个主题
ArrayList<String> topics = new ArrayList<>();
topics.add("testtopic");
kafkaConsumer.subscribe(topics);
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size()==0){
kafkaConsumer.poll(Duration.ofSeconds(1));
// 获取消费者分区分配信息(有了分区分配信息才能开始消费)
assignment = kafkaConsumer.assignment();
}
// 遍历所有分区,并指定Offset 从 1700 的位置开始消费
for (TopicPartition topicPartition : assignment) {
kafkaConsumer.seek(topicPartition, 1700);
}
// 3 消费该主题数据
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}
5.2 指定时间消费
应用场景: 在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。
public static void main(String[] args) {
// 0 配置信息
Properties properties = new Properties();
// 连接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop105:9092");
// key value 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test8989");
// 1 创建一个消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 2 订阅一个主题
List<String> topics = Collections.singletonList("testtopic");
kafkaConsumer.subscribe(topics);
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size() == 0) {
kafkaConsumer.poll(Duration.ofSeconds(1));
// 获取消费者分区分配信息(有了分区分配信息才能开始消费)
assignment = kafkaConsumer.assignment();
}
HashMap<TopicPartition, Long> partitionLongHashMap = new HashMap<>();
// 封装集合存储,每个分区对应一天前的数据
for (TopicPartition topicPartition : assignment) {
partitionLongHashMap.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
}
// 获取从 1 天前开始消费的每个分区的 offset
Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(partitionLongHashMap);
// 遍历每个分区,对每个分区设置消费时间。
for (TopicPartition topicPartition : assignment) {
OffsetAndTimestamp offsetAndTimestamp = offsets.get(topicPartition);
// 根据时间指定开始消费的位置
if (offsetAndTimestamp != null) {
kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());
}
}
// 3 消费该主题数据
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}