消费者组中分区的分配以及再平衡
- 一个consumer group中有多个consumer组成,一个Topic有多个Partition组成,现在的问题是,到底由哪个consumer来消费哪个Partition的数据。
- Kafka有四种主流的分区分配策略: Range、RoundRobin、Sticky、CooperativeSticky。可以通过配置参数partition.assignment.strategy,修改分区的分配策略。默认策略是Range + CooperativeSticky。Kafka可以同时使用多个分区分配策略。
1. 消费者组中分区的分配
参数说明:
参数名称 | 描述 |
---|---|
heartbeat.interval.ms | Kafka 消费者和coordinator之间的心跳时间,默认 3s。该条目的值必须小于session.timeout.ms ,也不应该高于session.timeout.ms 的 1/3。 |
session.timeout.ms | Kafka 消费者和 coordinator 之间连接超时时间,默认45s。超过该值,该消费者被移除,消费者组执行再平衡。 |
max.poll.interval.ms | 消费者处理消息的最大时长,默认是5分钟。超过该值,该消费者被移除,消费者组执行再平衡。 |
partition.assignment.strategy | 消费者分区分配策略 , 默 认 策 略 是Range + CooperativeSticky 。Kafka 可以同时使用多个分区分配策略。可 以 选 择 的 策 略 包 括 : Range 、RoundRobin 、Sticky 、CooperativeSticky |
2. Range策略以及再平衡
2.1 Range分区策略原理
2.2 Range分区代码实操
- 修改testtopic分区,改成7个
sh
[jack@hadoop107 kafka-3.6.1]$ ./bin/kafka-topics.sh --bootstrap-server 192.168.101.107:9092 --alter --topic testtopic --partitions 7
[jack@hadoop107 kafka-3.6.1]$ ./bin/kafka-topics.sh --bootstrap-server 192.168.101.107:9092 --describe --topic testtopic
Topic: testtopic TopicId: MnxtaR5OSxGVvnLX2c8KJQ PartitionCount: 7 ReplicationFactor: 3 Configs:
Topic: testtopic Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 3,1,2
Topic: testtopic Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 3,1,2
Topic: testtopic Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: testtopic Partition: 3 Leader: 4 Replicas: 4,1,2 Isr: 4,1,2
Topic: testtopic Partition: 4 Leader: 1 Replicas: 1,3,4 Isr: 1,3,4
Topic: testtopic Partition: 5 Leader: 2 Replicas: 2,4,1 Isr: 2,4,1
Topic: testtopic Partition: 6 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
- 同时启动 3个消费者, 指定消费者组test666, 指定消费主题为testtopic。
java
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.105:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 配置消费者组(组名任意起名)
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test666");
// 创建消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
List<String> topics = Collections.singletonList("testtopic");
kafkaConsumer.subscribe(topics);
// 拉取数据打印
while (true){
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(2));
for (ConsumerRecord<String, String> msg : consumerRecords) {
System.out.println("偏移量: "+ msg.offset()+"\t分区:"+msg.partition()+"\t消息key: "+msg.key()+"\t消息内容: "+msg.value());
}
}
}
java
public static void main(String[] args) throws InterruptedException {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.105:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 配置消费者组(组名任意起名)
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test666");
// 创建消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
List<String> topics = Collections.singletonList("testtopic");
kafkaConsumer.subscribe(topics);
// 拉取数据打印
while (true){
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(2));
for (ConsumerRecord<String, String> msg : consumerRecords) {
System.out.println("偏移量: "+ msg.offset()+"\t分区:"+msg.partition()+"\t消息key: "+msg.key()+"\t消息内容: "+msg.value());
}
}
}
java
public static void main(String[] args) throws InterruptedException {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.105:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 配置消费者组(组名任意起名)
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test666");
// 创建消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
List<String> topics = Collections.singletonList("testtopic");
kafkaConsumer.subscribe(topics);
// 拉取数据打印
while (true){
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(2));
for (ConsumerRecord<String, String> msg : consumerRecords) {
System.out.println("偏移量: "+ msg.offset()+"\t分区:"+msg.partition()+"\t消息key: "+msg.key()+"\t消息内容: "+msg.value());
}
}
}
提示
Kafka默认的分区分配策略就是Range
+ CooperativeSticky
,所以不需要修改策略。
- 启动CustomProducer生产者,发送 500 条消息,随机发送到不同的分区。
java
public static void main(String[] args) throws InterruptedException {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.105:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
for (int i = 0; i < 500; i++) {
int finalI = i;
// 指定key值有助于打散消息到不同分区
kafkaProducer.send(new ProducerRecord<>("testtopic",null, i+"", "我是spark"+i), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception==null){
System.out.println("当前"+ finalI +"消息发送完成, 所在分区:"+metadata.partition());
}else{
exception.printStackTrace();
}
}
});
Thread.sleep(10);
}
kafkaProducer.close();
}
运行结果:
2.3 Range分区分配再平衡
- 如图可以看到实际消费者分配分区情况如下:
- CustomGroupConsumer消费者:消费到 5、6 号分区数据。
- CustomGroup1Consumer消费者:消费到 0、1、2 号分区数据。
- CustomGroup2Consumer消费者:消费到 3、4 号分区数据。
- 停止掉CustomGroup1Consumer消费者,快速重新发送消息观看结果(45s 以内,越快越好): CustomGroup1Consumer消费者的任务会整体被分配到CustomGroupConsumer消费者或者CustomGroup2Consumer消费者。
提示
CustomGroup1Consumer消费者挂掉后,消费者组需要按照超时时间45s来判断它是否退出,所以需要等待,时间到了45s后,判断它真的退出就会把任务分配给其他 broker执行。
- 再次重新发送消息观看结果(45s 以后)
- CustomGroup2Consumer消费者:消费到0、1、2、3号分区数据。
- CustomGroupConsumer消费者:消费到4、5、6号分区数据。
提示
消费者CustomGroup1Consumer已经被踢出消费者组,所以重新按照Range方式分配。
3. RoundRobin策略以及再平衡
3.1 RoundRobin分区策略原理
3.2 RoundRobin分区代码实操
- 依次在CustomGroupConsumer、CustomGroup1Consumer、CustomGroup2Consumer三个消费者代码中修改分区分配策略为RoundRobin:
java
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.105:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 修改分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
// 配置消费者组(组名任意起名)
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test666");
// 创建消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
List<String> topics = Collections.singletonList("testtopic");
kafkaConsumer.subscribe(topics);
// 拉取数据打印
while (true){
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(2));
for (ConsumerRecord<String, String> msg : consumerRecords) {
System.out.println("偏移量: "+ msg.offset()+"\t分区:"+msg.partition()+"\t消息key: "+msg.key()+"\t消息内容: "+msg.value());
}
}
}
java
public static void main(String[] args) throws InterruptedException {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.105:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 修改分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
// 配置消费者组(组名任意起名)
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test666");
// 创建消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
List<String> topics = Collections.singletonList("testtopic");
kafkaConsumer.subscribe(topics);
// 拉取数据打印
while (true){
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(2));
for (ConsumerRecord<String, String> msg : consumerRecords) {
System.out.println("偏移量: "+ msg.offset()+"\t分区:"+msg.partition()+"\t消息key: "+msg.key()+"\t消息内容: "+msg.value());
}
}
}
java
public static void main(String[] args) throws InterruptedException {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.105:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 修改分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
// 配置消费者组(组名任意起名)
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test666");
// 创建消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
List<String> topics = Collections.singletonList("testtopic");
kafkaConsumer.subscribe(topics);
// 拉取数据打印
while (true){
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(2));
for (ConsumerRecord<String, String> msg : consumerRecords) {
System.out.println("偏移量: "+ msg.offset()+"\t分区:"+msg.partition()+"\t消息key: "+msg.key()+"\t消息内容: "+msg.value());
}
}
}
- 使用同样的生产者发送 500 条消息, 查看分区结果
3.3 RoundRobin分区分配再平衡
- 如图可以看到实际消费者分配分区情况如下:
- CustomGroupConsumer消费者:消费到0、3、6号分区数据。
- CustomGroup1Consumer消费者:消费到1、4号分区数据。
- CustomGroup2Consumer消费者:消费到2、5号分区数据。
- 停止掉CustomGroupConsumer消费者,快速重新发送消息观看结果(45s 以内,越快越好):
消费者的任务会按照RoundRobin的方式,把数据轮询分成0、3和6号分区数据,分别由CustomGroup1Consumer消费者或者CustomGroup2Consumer消费者消费。
提示
CustomGroup1Consumer消费者挂掉后,消费者组需要按照超时时间45s来判断它是否退出,所以需要等待,时间到了45s后,判断它真的退出就会把任务分配给其他broker执行。
- 再次重新发送消息观看结果(45s以后)
提示
消费者CustomGroupConsumer已经被踢出消费者组,所以重新按照RoundRobin方式分配。
4. Sticky以及再平衡
4.1 粘性分区定义
可以理解为分配的结果带有"粘性的"。即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。
粘性分区是Kafka从0.11.x版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。
4.2 Sticky分区代码实操
- 需求:
设置主题为testtopic,7个分区;准备3个消费者,采用粘性分区策略,并进行消费,观察消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。 - 修改CustomGroupConsumer、CustomGroup1Consumer、CustomGroup2Consumer配置分区分配策略为粘性
java
// 修改分区分配策略
ArrayList<String> startegys = new ArrayList<>();
startegys.add("org.apache.kafka.clients.consumer.StickyAssignor");
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, tartegys);
信息
3个消费者如果启动出现报错,全部停止等会再重启,或者修改为全新的消费者组。
- 使用同样的生产者发送500条消息,查看分区结果
可以看到Kafka使用Sticky分区策略会尽量保持分区的个数近似划分分区。
4.3 Sticky分区分配再平衡
- 可以看到目前实际消费者分配分区情况如下:
- CustomGroupConsumer消费者:消费到4、5号分区数据。
- CustomGroup1Consumer消费者:消费到0、1、2号分区数据。
- CustomGroup2Consumer消费者:消费到3、6号分区数据。
- 停止掉CustomGroupConsumer消费者,快速重新发送消息观察结果(45s 以内,越快越好):
CustomGroupConsumer消费者的任务会按照粘性规则,尽可能均衡的随机分成4和5号分区数据,分别由CustomGroup1Consumer消费者和CustomGroup2Consumer消费者消费。 - 再次重新发送消息观看结果(45s 以后)
- CustomGroup1Consumer消费者:消费到 0、1、5 号分区数据。
- CustomGroup2Consumer消费者:消费到 3、4、6 号分区数据。
说明:消费者CustomGroup2Consumer已经被踢出消费者组,所以重新按照粘性方式分配。