Skip to content

Kafka的Offset位移

1. Offset的默认维护位置

Alt text __consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.id+topic+分区号,value 就是当前 offset 的值。每隔一段时间kafka 内部会对这个Topic进行compact,也就是每个 group.id+topic+分区号就保留最新数据。

2. 查看Kafka的Offset

_consumer_offsets为Kafka 中的 topic,那就可以通过消费者进行消费查看。

2.1 修改Kafka配置

默认Kafka不对外提供系统主题查看,为了查看系统主题数据,需要修改config/consumer.properties文件做以下配置:

sh
[jack@hadoop107 kafka-3.6.1]$ vi config/consumer.properties
## 末尾加上一行,默认值为true
exclude.internal.topics=false
[jack@hadoop107 kafka-3.6.1]$ scp config/consumer.properties jack@192.168.101.105:/opt/module/kafka-3.6.1/config/
[jack@hadoop107 kafka-3.6.1]$ scp config/consumer.properties jack@192.168.101.106:/opt/module/kafka-3.6.1/config/

2.2 查看消费者消费主题__consumer_offsets

sh
[jack@hadoop107 kafka-3.6.1]$ bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server hadoop105:9092 --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
.....
[test888,testtopic,0]::OffsetAndMetadata(offset=2973, leaderEpoch=Optional[4], metadata=, commitTimestamp=1710754619635, expireTimestamp=None)
[test888,testtopic,5]::OffsetAndMetadata(offset=1222, leaderEpoch=Optional[0], metadata=, commitTimestamp=1710754619635, expireTimestamp=None)
[test888,testtopic,2]::OffsetAndMetadata(offset=2604, leaderEpoch=Optional[5], metadata=, commitTimestamp=1710754619635, expireTimestamp=None)
[test888,testtopic,1]::OffsetAndMetadata(offset=2940, leaderEpoch=Optional[6], metadata=, commitTimestamp=1710754619635, expireTimestamp=None)
[test888,testtopic,6]::OffsetAndMetadata(offset=936, leaderEpoch=Optional[0], metadata=, commitTimestamp=1710754624583, expireTimestamp=None)
.....

__consumer_offsets的消费情况在末尾仍然不断打印,可以看到之前历史消费的消息记录。

3. 自动提交Offset

为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能。
Alt text 自动提交offset的相关参数:

  • enable.auto.commit:是否开启自动提交offset功能,默认是true
  • auto.commit.interval.ms:自动提交offset的时间间隔,默认是5s

3.1 消费者代码应用自动提交Offset

java
public static void main(String[] args) {
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.105:9092");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StickyAssignor.class.getName());
    //  配置消费者组(组名任意起名)
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test999");
    // 是否自动提交 offset
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); // 默认就是true
    // 提交 offset 的时间周期 1000ms,默认 5s
    properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);// 设置1s

    // 创建消费者
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
    List<String> topics = Collections.singletonList("testtopic");
    kafkaConsumer.subscribe(topics);
    // 拉取数据打印
    while (true){
        ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(2));
        for (ConsumerRecord<String, String> msg : consumerRecords) {
            System.out.println("偏移量: "+ msg.offset()+"\t分区:"+msg.partition()+"\t消息key: "+msg.key()+"\t消息内容: "+msg.value());
        }
    }
}

4. 手动提交Offset

虽然自动提交Offset十分简单便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因此Kafka还提供了手动提交Offset的API。
手动提交Offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。
两者的相同点是,都会将本次提交的一批数据最高的偏移量提交;
不同点是,同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败。

  • commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。
  • commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。 Alt text

4.1 同步提交Offset

由于同步提交Offset有失败重试机制,故更加可靠,但是由于一直等待提交结果,提交的效率比较低。

4.2 同步提交代码示例

java
public class CustomConsumerByHandSync {
    public static void main(String[] args) {
        // 1. 创建 kafka 消费者配置类
        Properties properties = new Properties();
        // 2. 添加配置参数
        // 添加连接
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        // 配置序列化 必须
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        // 配置消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        // 是否自动提交 offset
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        //3. 创建 kafka 消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        //4. 设置消费主题 形参是列表
        consumer.subscribe(Arrays.asList("first"));
        //5. 消费数据
        while (true){
        // 读取消息
        ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
        // 输出消息
        for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
            System.out.println(consumerRecord.value());
        }
        // 同步提交 offset
        consumer.commitSync();
        }
    }
}

4.3 异步提交Offset

虽然同步提交Offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交Offset的方式。

4.4 异步提交Offset的示例

java
public class CustomConsumerByHandAsync {
    public static void main(String[] args) {
        // 1. 创建 kafka 消费者配置类
        Properties properties = new Properties();
        // 2. 添加配置参数
        // 添加连接
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        // 配置序列化 必须
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
 
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        // 配置消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        // 是否自动提交 offset
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        //3. 创建 Kafka 消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        //4. 设置消费主题 形参是列表
        consumer.subscribe(Arrays.asList("first"));
        //5. 消费数据
        while (true){
            // 读取消息
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            // 输出消息
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.value());
            }
            // 异步提交 offset
            consumer.commitAsync();
        }
    }
}

5. 指定Offset消费

配置项auto.offset.reset=earliest|latest|none, 默认是 latest。

  • earliest:自动将偏移量重置为最早的偏移量,和kafka-console-consumer.sh命令行客户端参数--from-beginning语义相同。
  • latest(默认值):自动将偏移量重置为最新偏移量。
  • none:如果未找到消费者组的先前偏移量,则向消费者抛出异常。 Alt text

5.1 任意指定Offset位移开始消费

java
public static void main(String[] args) {
    // 0 配置信息
    Properties properties = new Properties();
    // 连接
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop105:9092");
    // key value 反序列化
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test8989");
    // 1 创建一个消费者
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    // 2 订阅一个主题
    ArrayList<String> topics = new ArrayList<>();
    topics.add("testtopic");
    kafkaConsumer.subscribe(topics);

    Set<TopicPartition> assignment = new HashSet<>();
    while (assignment.size()==0){
        kafkaConsumer.poll(Duration.ofSeconds(1));
        // 获取消费者分区分配信息(有了分区分配信息才能开始消费)
        assignment = kafkaConsumer.assignment();
    }
    // 遍历所有分区,并指定Offset 从 1700 的位置开始消费
    for (TopicPartition topicPartition : assignment) {
        kafkaConsumer.seek(topicPartition, 1700);
    }
    // 3 消费该主题数据
    while (true) {
        ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
        for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
            System.out.println(consumerRecord);
        }
    }
}

5.2 指定时间消费

应用场景: 在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。

java
public static void main(String[] args) {

    // 0 配置信息
    Properties properties = new Properties();
    // 连接
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop105:9092");
    // key value 反序列化
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test8989");
    // 1 创建一个消费者
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    // 2 订阅一个主题
    List<String> topics = Collections.singletonList("testtopic");
    kafkaConsumer.subscribe(topics);
    Set<TopicPartition> assignment = new HashSet<>();
    while (assignment.size() == 0) {
        kafkaConsumer.poll(Duration.ofSeconds(1));
        // 获取消费者分区分配信息(有了分区分配信息才能开始消费)
        assignment = kafkaConsumer.assignment();
    }
    HashMap<TopicPartition, Long> partitionLongHashMap = new HashMap<>();

    // 封装集合存储,每个分区对应一天前的数据
    for (TopicPartition topicPartition : assignment) {
        partitionLongHashMap.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
    }
    // 获取从 1 天前开始消费的每个分区的 offset
    Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(partitionLongHashMap);
    // 遍历每个分区,对每个分区设置消费时间。
    for (TopicPartition topicPartition : assignment) {
        OffsetAndTimestamp offsetAndTimestamp = offsets.get(topicPartition);
        // 根据时间指定开始消费的位置
        if (offsetAndTimestamp != null) {
            kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());
        }
    }
    // 3 消费该主题数据
    while (true) {
        ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
        for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
            System.out.println(consumerRecord);
        }
    }
}