Skip to content

Kafka副本

1. 副本概述

  1. Kafka 副本作用:提高数据可靠性。
  2. Kafka 默认副本 1个,生产环境一般配置为2个,保证数据可靠性;太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率。
  3. Kafka 中副本分为:Leader和Follower。Kafka 生产者只会把数据发往 Leader,然后 Follower 找 Leader 进行同步数据。
  4. Kafka 分区中的所有副本统称为AR(Assigned Repllicas)。AR = ISR + OSR ISR表示和Leader保持同步的Follower集合。如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。Leader发生故障之后,就会从ISR中选举新的Leader。OSR表示Follower与Leader副本同步时,延迟过多的副本

2. Leader选举流程

Kafka集群中有一个Broker的Controller会被选举为Controller Leader,负责管理集群broker的上下线,所有 topic 的分区副本分配和 Leader 选举等工作。Controller的信息同步工作是依赖于Zookeeper的。 Alt text

  1. 创建一个新的 topic,3 个分区,3 个副本
sh
[jack@hadoop105 kafka-3.6.1]$ ./bin/kafka-topics.sh --bootstrap-server 192.168.101.106:9092 --create --topic demo_1 --partitions 3 --replication-factor 3
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic demo_1.
  1. 查看Leader分布情况
sh
[jack@hadoop105 kafka-3.6.1]$ ./bin/kafka-topics.sh --bootstrap-server 192.168.101.106:9092 --describe --topic testtopic
Topic: testtopic	TopicId: MnxtaR5OSxGVvnLX2c8KJQ	PartitionCount: 3	ReplicationFactor: 3	Configs: 
	Topic: testtopic	Partition: 0	Leader: 1	Replicas: 1,2,3	Isr: 1,2,3
	Topic: testtopic	Partition: 1	Leader: 2	Replicas: 2,3,1	Isr: 2,3,1
	Topic: testtopic	Partition: 2	Leader: 3	Replicas: 3,1,2	Isr: 3,1,2
  1. 停止掉hadoop106(broderid为2)的Kafka进程,并查看Leader分区情况
sh
[jack@hadoop105 kafka-3.6.1]$ ./bin/kafka-topics.sh --bootstrap-server 192.168.101.106:9092 --describe --topic testtopic
Topic: testtopic	TopicId: MnxtaR5OSxGVvnLX2c8KJQ	PartitionCount: 3	ReplicationFactor: 3	Configs: 
	Topic: testtopic	Partition: 0	Leader: 1	Replicas: 1,2,3	Isr: 1,2
	Topic: testtopic	Partition: 1	Leader: 2	Replicas: 2,3,1	Isr: 2,1
	Topic: testtopic	Partition: 2	Leader: 1	Replicas: 3,1,2	Isr: 1,2
  1. 停止掉hadoop107(broderid为3)的Kafka进程,并查看Leader分区情况
sh
[jack@hadoop105 kafka-3.6.1]$ ./bin/kafka-topics.sh --bootstrap-server 192.168.101.105:9092 --describe --topic testtopic
Topic: testtopic	TopicId: MnxtaR5OSxGVvnLX2c8KJQ	PartitionCount: 3	ReplicationFactor: 3	Configs: 
	Topic: testtopic	Partition: 0	Leader: 1	Replicas: 1,2,3	Isr: 1
	Topic: testtopic	Partition: 1	Leader: 1	Replicas: 2,3,1	Isr: 1
	Topic: testtopic	Partition: 2	Leader: 1	Replicas: 3,1,2	Isr: 1
  1. 依次启动hadoop107(broderid为3)、hadoop106(broderid为2)的Kafka进程,并查看Leader分区情况
sh
[jack@hadoop105 kafka-3.6.1]$ ./bin/kafka-topics.sh --bootstrap-server 192.168.101.105:9092 --describe --topic testtopic
Topic: testtopic	TopicId: MnxtaR5OSxGVvnLX2c8KJQ	PartitionCount: 3	ReplicationFactor: 3	Configs: 
	Topic: testtopic	Partition: 0	Leader: 1	Replicas: 1,2,3	Isr: 1,3
	Topic: testtopic	Partition: 1	Leader: 1	Replicas: 2,3,1	Isr: 1,3
	Topic: testtopic	Partition: 2	Leader: 1	Replicas: 3,1,2	Isr: 1,3
## 此时Leader全是brokerid为1的节点
[jack@hadoop105 kafka-3.6.1]$ ./bin/kafka-topics.sh --bootstrap-server 192.168.101.105:9092 --describe --topic testtopic
Topic: testtopic	TopicId: MnxtaR5OSxGVvnLX2c8KJQ	PartitionCount: 3	ReplicationFactor: 3	Configs: 
	Topic: testtopic	Partition: 0	Leader: 1	Replicas: 1,2,3	Isr: 1,3,2
	Topic: testtopic	Partition: 1	Leader: 1	Replicas: 2,3,1	Isr: 1,3,2
	Topic: testtopic	Partition: 2	Leader: 1	Replicas: 3,1,2	Isr: 1,3,2
## Kafka会自动重选Leader,Leader并不全是1, 片刻重新查询副本情况
[jack@hadoop105 kafka-3.6.1]$ ./bin/kafka-topics.sh --bootstrap-server 192.168.101.105:9092 --describe --topic testtopic
Topic: testtopic	TopicId: MnxtaR5OSxGVvnLX2c8KJQ	PartitionCount: 3	ReplicationFactor: 3	Configs: 
	Topic: testtopic	Partition: 0	Leader: 1	Replicas: 1,2,3	Isr: 1,3,2
	Topic: testtopic	Partition: 1	Leader: 2	Replicas: 2,3,1	Isr: 1,3,2
	Topic: testtopic	Partition: 2	Leader: 3	Replicas: 3,1,2	Isr: 1,3,2
  1. 停止掉hadoop105(broderid为1)的Kafka进程,,并查看Leader分区情况
sh
## Partition为0的分区并没有出现Leader为3(如果按照此时的Isr次序(1,3,2)看的话,1之后就是3),
## 实际是按照AR顺序(Replicas的顺序), 先根据Isr过滤AR列表,之后按照AR列表顺序选出Leader.
[jack@hadoop105 kafka-3.6.1]$ ./bin/kafka-topics.sh --bootstrap-server 192.168.101.106:9092 --describe --topic testtopic
Topic: testtopic	TopicId: MnxtaR5OSxGVvnLX2c8KJQ	PartitionCount: 3	ReplicationFactor: 3	Configs: 
	Topic: testtopic	Partition: 0	Leader: 2	Replicas: 1,2,3	Isr: 3,2
	Topic: testtopic	Partition: 1	Leader: 2	Replicas: 2,3,1	Isr: 3,2
	Topic: testtopic	Partition: 2	Leader: 3	Replicas: 3,1,2	Isr: 3,2

3. Follower故障处理细节

LEO(Log End Offset):每个副本的最后一个offset,LEO其实就是最新的offset + 1。
HW(High Watermark):所有副本中最小的LEO 。
Alt text

4. Leader故障处理细节

Alt text

5. 手动调整分区副本存储

在生产环境中,每台服务器的配置和性能不一致,但是Kafka只会根据自己的代码规则创建对应的分区副本,就会导致个别服务器存储压力较大。所有需要手动调整分区副本的存储。

  1. 查看分区副本存储情况
sh
[jack@hadoop105 kafka-3.6.1]$ ./bin/kafka-topics.sh --bootstrap-server 192.168.101.106:9092 --describe --topic demo2
Topic: demo2	TopicId: 8z3pHJvxRDi5AdiYSy7GIw	PartitionCount: 2	ReplicationFactor: 2	Configs: 
	Topic: demo2	Partition: 0	Leader: 4	Replicas: 4,2	Isr: 4,2
	Topic: demo2	Partition: 1	Leader: 2	Replicas: 2,3	Isr: 2,3
  1. 创建副本存储计划(所有副本都指定存储在broker2、broker3中)。
sh
[jack@hadoop105 kafka-3.6.1]$ vi change-replication-factor-demo.json
[jack@hadoop105 kafka-3.6.1]$ cat change-replication-factor-demo.json 
{
"version":1,
"partitions":[
    {"topic":"demo2","partition":0,"replicas":[2,3]},
    {"topic":"demo2","partition":1,"replicas":[2,3]}
	]
}
  1. 执行副本存储计划
sh
[jack@hadoop105 kafka-3.6.1]$ ./bin/kafka-reassign-partitions.sh --bootstrap-server hadoop106:9092 --reassignment-json-file change-replication-factor-demo.json --execute
Current partition replica assignment

{"version":1,"partitions":[{"topic":"demo2","partition":0,"replicas":[4,2],"log_dirs":["any","any"]},{"topic":"demo2","partition":1,"replicas":[2,3],"log_dirs":["any","any"]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for demo2-0,demo2-1
  1. 验证副本存储计划
sh
[jack@hadoop105 kafka-3.6.1]$ ./bin/kafka-reassign-partitions.sh --bootstrap-server hadoop106:9092 --reassignment-json-file change-replication-factor-demo.json --
verify
Status of partition reassignment:
Reassignment of partition demo2-0 is completed.
Reassignment of partition demo2-1 is completed.
[jack@hadoop105 kafka-3.6.1]$ ./bin/kafka-topics.sh --bootstrap-server 192.168.101.106:9092 --describe --topic demo2
Topic: demo2	TopicId: 8z3pHJvxRDi5AdiYSy7GIw	PartitionCount: 2	ReplicationFactor: 2	Configs: 
	Topic: demo2	Partition: 0	Leader: 2	Replicas: 2,3	Isr: 2,3
	Topic: demo2	Partition: 1	Leader: 2	Replicas: 2,3	Isr: 2,3

6. 负载平衡

正常情况下,Kafka本身会自动把Leader Partition均匀分散在各个机器上,来保证每台机器的读写吞吐量都是均匀的。但是如果某些broker宕机,会导致Leader Partition过于集中在其他少部分几台broker上,这会导致少数几台broker的读写请求压力过高,其他宕机的broker重启之后都是follower partition,读写请求很低,造成集群负载不均衡。

6.1 自动平衡触发条件

  • auto.leader.rebalance.enable,默认是true。开启自动Leader Partition平衡
  • leader.imbalance.per.broker.percentage,默认是10%。每个broker允许的不平衡的leader的比率。如果每个broker超过了这个值,控制器会触发leader的平衡。
  • leader.imbalance.check.interval.seconds,默认值300秒。检查leader负载是否平衡的间隔时间。
    假设集群有一个主题如下图所示:
    Alt text 针对Broker0节点,分区2的AR优先副本是0节点,但是0节点却不是Leader节点,所以不平衡数加1,AR副本总数是4, 所以broker0节点不平衡率为1/4>10%,需要再平衡。Broker2、Broker3节点和Broker0不平衡率一样,需要再平衡。Broker1的不平衡数为0,不需要再平衡。

提示

生产环境中,leader 重选举的代价比较大,可能会带来性能影响,建议设置为false关闭。

7. 增加副本因子

在生产环境当中,由于某个主题的重要等级需要提升,我们考虑增加副本。副本数的增加需要先制定计划,然后根据计划执行。

7.1 创建副本存储计划(所有副本都指定存储在 broker0、broker1、broker2 中)。

sh
[jack@hadoop105 kafka-3.6.1]$ vim increase-replication-factor.json

输入如下内容:

json
{"version":1,"partitions":[{"topic":"four","partition":0,"replica
s":[0,1,2]},{"topic":"four","partition":1,"replicas":[0,1,2]},{"t
opic":"four","partition":2,"replicas":[0,1,2]}]}

7.2 执行副本存储计划

sh
[jack@hadoop105 kafka-3.6.1]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file 
increase-replication-factor.json --execute