Skip to content

消费者组中分区的分配以及再平衡

  1. 一个consumer group中有多个consumer组成,一个Topic有多个Partition组成,现在的问题是,到底由哪个consumer来消费哪个Partition的数据。
  2. Kafka有四种主流的分区分配策略: Range、RoundRobin、Sticky、CooperativeSticky。可以通过配置参数partition.assignment.strategy,修改分区的分配策略。默认策略是Range + CooperativeSticky。Kafka可以同时使用多个分区分配策略。

1. 消费者组中分区的分配

Alt text 参数说明:

参数名称描述
heartbeat.interval.msKafka 消费者和coordinator之间的心跳时间,默认 3s。该条目的值必须小于session.timeout.ms,也不应该高于session.timeout.ms的 1/3。
session.timeout.msKafka 消费者和 coordinator 之间连接超时时间,默认45s。超过该值,该消费者被移除,消费者组执行再平衡。
max.poll.interval.ms消费者处理消息的最大时长,默认是5分钟。超过该值,该消费者被移除,消费者组执行再平衡。
partition.assignment.strategy消费者分区分配策略 , 默 认 策 略 是Range + CooperativeSticky。Kafka 可以同时使用多个分区分配策略。可 以 选 择 的 策 略 包 括 : RangeRoundRobinStickyCooperativeSticky

2. Range策略以及再平衡

2.1 Range分区策略原理

Alt text

2.2 Range分区代码实操

  1. 修改testtopic分区,改成7个
sh
[jack@hadoop107 kafka-3.6.1]$ ./bin/kafka-topics.sh --bootstrap-server 192.168.101.107:9092 --alter --topic testtopic --partitions 7
[jack@hadoop107 kafka-3.6.1]$ ./bin/kafka-topics.sh --bootstrap-server 192.168.101.107:9092 --describe --topic testtopic
Topic: testtopic	TopicId: MnxtaR5OSxGVvnLX2c8KJQ	PartitionCount: 7	ReplicationFactor: 3	Configs: 
	Topic: testtopic	Partition: 0	Leader: 1	Replicas: 1,2,3	Isr: 3,1,2
	Topic: testtopic	Partition: 1	Leader: 2	Replicas: 2,3,1	Isr: 3,1,2
	Topic: testtopic	Partition: 2	Leader: 3	Replicas: 3,1,2	Isr: 3,1,2
	Topic: testtopic	Partition: 3	Leader: 4	Replicas: 4,1,2	Isr: 4,1,2
	Topic: testtopic	Partition: 4	Leader: 1	Replicas: 1,3,4	Isr: 1,3,4
	Topic: testtopic	Partition: 5	Leader: 2	Replicas: 2,4,1	Isr: 2,4,1
	Topic: testtopic	Partition: 6	Leader: 3	Replicas: 3,1,2	Isr: 3,1,2
  1. 同时启动 3个消费者, 指定消费者组test666, 指定消费主题为testtopic。
java
public static void main(String[] args) {
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.105:9092");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    //  配置消费者组(组名任意起名)
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test666");

    // 创建消费者
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
    List<String> topics = Collections.singletonList("testtopic");
    kafkaConsumer.subscribe(topics);
    // 拉取数据打印
    while (true){
        ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(2));
        for (ConsumerRecord<String, String> msg : consumerRecords) {
            System.out.println("偏移量: "+ msg.offset()+"\t分区:"+msg.partition()+"\t消息key: "+msg.key()+"\t消息内容: "+msg.value());
        }
    }
}
java
public static void main(String[] args) throws InterruptedException {
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.105:9092");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    //  配置消费者组(组名任意起名)
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test666");

    // 创建消费者
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
    List<String> topics = Collections.singletonList("testtopic");
    kafkaConsumer.subscribe(topics);
    // 拉取数据打印
    while (true){
        ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(2));
        for (ConsumerRecord<String, String> msg : consumerRecords) {
            System.out.println("偏移量: "+ msg.offset()+"\t分区:"+msg.partition()+"\t消息key: "+msg.key()+"\t消息内容: "+msg.value());
        }
    }
}
java
public static void main(String[] args) throws InterruptedException {
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.105:9092");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    //  配置消费者组(组名任意起名)
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test666");

    // 创建消费者
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
    List<String> topics = Collections.singletonList("testtopic");
    kafkaConsumer.subscribe(topics);
    // 拉取数据打印
    while (true){
        ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(2));
        for (ConsumerRecord<String, String> msg : consumerRecords) {
            System.out.println("偏移量: "+ msg.offset()+"\t分区:"+msg.partition()+"\t消息key: "+msg.key()+"\t消息内容: "+msg.value());
        }
    }
}

提示

Kafka默认的分区分配策略就是Range + CooperativeSticky,所以不需要修改策略。

  1. 启动CustomProducer生产者,发送 500 条消息,随机发送到不同的分区。
java
public static void main(String[] args) throws InterruptedException {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.105:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    for (int i = 0; i < 500; i++) {
        int finalI = i;
        // 指定key值有助于打散消息到不同分区
        kafkaProducer.send(new ProducerRecord<>("testtopic",null, i+"", "我是spark"+i), new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if(exception==null){
                    System.out.println("当前"+ finalI +"消息发送完成, 所在分区:"+metadata.partition());
                }else{
                    exception.printStackTrace();
                }
            }
        });
        Thread.sleep(10);
    }
    kafkaProducer.close();
}

运行结果: Alt textAlt text

2.3 Range分区分配再平衡

  1. 如图可以看到实际消费者分配分区情况如下:
    • CustomGroupConsumer消费者:消费到 5、6 号分区数据。
    • CustomGroup1Consumer消费者:消费到 0、1、2 号分区数据。
    • CustomGroup2Consumer消费者:消费到 3、4 号分区数据。
  2. 停止掉CustomGroup1Consumer消费者,快速重新发送消息观看结果(45s 以内,越快越好): CustomGroup1Consumer消费者的任务会整体被分配到CustomGroupConsumer消费者或者CustomGroup2Consumer消费者。
    Alt text

提示

CustomGroup1Consumer消费者挂掉后,消费者组需要按照超时时间45s来判断它是否退出,所以需要等待,时间到了45s后,判断它真的退出就会把任务分配给其他 broker执行。

  1. 再次重新发送消息观看结果(45s 以后)
  • CustomGroup2Consumer消费者:消费到0、1、2、3号分区数据。
  • CustomGroupConsumer消费者:消费到4、5、6号分区数据。 Alt text

提示

消费者CustomGroup1Consumer已经被踢出消费者组,所以重新按照Range方式分配。

3. RoundRobin策略以及再平衡

3.1 RoundRobin分区策略原理

Alt text

3.2 RoundRobin分区代码实操

  1. 依次在CustomGroupConsumer、CustomGroup1Consumer、CustomGroup2Consumer三个消费者代码中修改分区分配策略为RoundRobin:
java
public static void main(String[] args) {
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.105:9092");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    // 修改分区分配策略
    properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
    //  配置消费者组(组名任意起名)
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test666");

    // 创建消费者
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
    List<String> topics = Collections.singletonList("testtopic");
    kafkaConsumer.subscribe(topics);
    // 拉取数据打印
    while (true){
        ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(2));
        for (ConsumerRecord<String, String> msg : consumerRecords) {
            System.out.println("偏移量: "+ msg.offset()+"\t分区:"+msg.partition()+"\t消息key: "+msg.key()+"\t消息内容: "+msg.value());
        }
    }
}
java
public static void main(String[] args) throws InterruptedException {
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.105:9092");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
     // 修改分区分配策略
    properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
    //  配置消费者组(组名任意起名)
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test666");

    // 创建消费者
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
    List<String> topics = Collections.singletonList("testtopic");
    kafkaConsumer.subscribe(topics);
    // 拉取数据打印
    while (true){
        ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(2));
        for (ConsumerRecord<String, String> msg : consumerRecords) {
            System.out.println("偏移量: "+ msg.offset()+"\t分区:"+msg.partition()+"\t消息key: "+msg.key()+"\t消息内容: "+msg.value());
        }
    }
}
java
public static void main(String[] args) throws InterruptedException {
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.105:9092");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
     // 修改分区分配策略
    properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
    //  配置消费者组(组名任意起名)
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test666");

    // 创建消费者
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
    List<String> topics = Collections.singletonList("testtopic");
    kafkaConsumer.subscribe(topics);
    // 拉取数据打印
    while (true){
        ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(2));
        for (ConsumerRecord<String, String> msg : consumerRecords) {
            System.out.println("偏移量: "+ msg.offset()+"\t分区:"+msg.partition()+"\t消息key: "+msg.key()+"\t消息内容: "+msg.value());
        }
    }
}
  1. 使用同样的生产者发送 500 条消息, 查看分区结果
    分区结果图

3.3 RoundRobin分区分配再平衡

  1. 如图可以看到实际消费者分配分区情况如下:
    • CustomGroupConsumer消费者:消费到0、3、6号分区数据。
    • CustomGroup1Consumer消费者:消费到1、4号分区数据。
    • CustomGroup2Consumer消费者:消费到2、5号分区数据。
  2. 停止掉CustomGroupConsumer消费者,快速重新发送消息观看结果(45s 以内,越快越好):
    消费者的任务会按照RoundRobin的方式,把数据轮询分成0、3和6号分区数据,分别由CustomGroup1Consumer消费者或者CustomGroup2Consumer消费者消费。
    Alt text

提示

CustomGroup1Consumer消费者挂掉后,消费者组需要按照超时时间45s来判断它是否退出,所以需要等待,时间到了45s后,判断它真的退出就会把任务分配给其他broker执行。

  1. 再次重新发送消息观看结果(45s以后)
    Alt text

提示

消费者CustomGroupConsumer已经被踢出消费者组,所以重新按照RoundRobin方式分配。

4. Sticky以及再平衡

4.1 粘性分区定义

可以理解为分配的结果带有"粘性的"。即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。
粘性分区是Kafka从0.11.x版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。

4.2 Sticky分区代码实操

  1. 需求:
    设置主题为testtopic,7个分区;准备3个消费者,采用粘性分区策略,并进行消费,观察消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。
  2. 修改CustomGroupConsumer、CustomGroup1Consumer、CustomGroup2Consumer配置分区分配策略为粘性
java
// 修改分区分配策略
ArrayList<String> startegys = new ArrayList<>();
startegys.add("org.apache.kafka.clients.consumer.StickyAssignor");
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, tartegys);

信息

3个消费者如果启动出现报错,全部停止等会再重启,或者修改为全新的消费者组。

  1. 使用同样的生产者发送500条消息,查看分区结果
    Alt text 可以看到Kafka使用Sticky分区策略会尽量保持分区的个数近似划分分区。

4.3 Sticky分区分配再平衡

  1. 可以看到目前实际消费者分配分区情况如下:
    • CustomGroupConsumer消费者:消费到4、5号分区数据。
    • CustomGroup1Consumer消费者:消费到0、1、2号分区数据。
    • CustomGroup2Consumer消费者:消费到3、6号分区数据。
  2. 停止掉CustomGroupConsumer消费者,快速重新发送消息观察结果(45s 以内,越快越好):
    Alt text
    CustomGroupConsumer消费者的任务会按照粘性规则,尽可能均衡的随机分成4和5号分区数据,分别由CustomGroup1Consumer消费者和CustomGroup2Consumer消费者消费。
  3. 再次重新发送消息观看结果(45s 以后)
    • CustomGroup1Consumer消费者:消费到 0、1、5 号分区数据。
    • CustomGroup2Consumer消费者:消费到 3、4、6 号分区数据。
      Alt text 说明:消费者CustomGroup2Consumer已经被踢出消费者组,所以重新按照粘性方式分配。