Kafka生产者分区
1. 分区的意义
- 便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。
- 提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。
2. 生产者发送消息的分区策略
2.1 Kafka分区源码
Kafka3.x不再使用默认分区器(DefaultPartitioner), 如下图所示,默认分区器已过时。 Kafka直接使用org.apache.kafka.clients.producer.KafkaProducer#partition:
java
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
// 用户指定分区数不为空,就按照用户的分区数来分区
if (record.partition() != null)
return record.partition();
// 如果传入自定义分区器,使用自定义分区器分区
if (partitioner != null) {
int customPartition = partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
if (customPartition < 0) {
throw new IllegalArgumentException(String.format(
"The partitioner generated an invalid partition number: %d. Partition number should always be non-negative.", customPartition));
}
return customPartition;
}
// 按照消息中的key进行hash取模分区
if (serializedKey != null && !partitionerIgnoreKeys) {
// hash the keyBytes to choose a partition
return BuiltInPartitioner.partitionForKey(serializedKey, cluster.partitionsForTopic(record.topic()).size());
} else {
return RecordMetadata.UNKNOWN_PARTITION;
}
}
2.2 Kafka分区规则
3. 自定义分区器
3.1 需求
例如我们实现一个分区器实现,发送过来的数据中如果包含年龄大于60,就发往0号分区,小于60,就发往1号分区。方便消费者保存表数据。
3.2 实现步骤
- 定义类实现Partitioner接口
- 重写partition()方法
java
public class MyPartitioner implements Partitioner {
/* 返回信息对应的分区
* @param topic 主题
* @param key 消息的 key
* @param keyBytes 消息的 key 序列化后的字节数组
* @param value 消息的 value
* @param valueBytes 消息的 value 序列化后的字节数组
* @param cluster 集群元数据可以查看分区信息
* @return
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if ("personTable".equals(key)) {
int age = Integer.parseInt(value.toString());
if (age >= 60) {
return 0;
}
}
return 1;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
System.out.println(configs);
}
}
java
public class MyProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 配置
Properties properties = new Properties();
// 连接集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.105:9092,192.168.101.106:9092,192.168.101.107:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 添加自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.example.part.MyPartitioner");
// 1. 创建生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
//2. 发送数据
Random random = new Random();
for (int i = 0; i < 100; i++) {
int i1 = random.nextInt(110);
kafkaProducer.send(new ProducerRecord<>("hadoop", "personTable", ""+i1));
}
//3. 关闭资源
kafkaProducer.close();
}
}