Skip to content

Kafka生产者分区

1. 分区的意义

  • 便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。
  • 提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。

2. 生产者发送消息的分区策略

2.1 Kafka分区源码

Kafka3.x不再使用默认分区器(DefaultPartitioner), 如下图所示,默认分区器已过时。 默认分区器已过时图片 Kafka直接使用org.apache.kafka.clients.producer.KafkaProducer#partition:

java
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
        // 用户指定分区数不为空,就按照用户的分区数来分区
        if (record.partition() != null)
            return record.partition();
        // 如果传入自定义分区器,使用自定义分区器分区
        if (partitioner != null) {
            int customPartition = partitioner.partition(
                record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
            if (customPartition < 0) {
                throw new IllegalArgumentException(String.format(
                    "The partitioner generated an invalid partition number: %d. Partition number should always be non-negative.", customPartition));
            }
            return customPartition;
        }
        // 按照消息中的key进行hash取模分区
        if (serializedKey != null && !partitionerIgnoreKeys) {
            // hash the keyBytes to choose a partition
            return BuiltInPartitioner.partitionForKey(serializedKey, cluster.partitionsForTopic(record.topic()).size());
        } else {
            return RecordMetadata.UNKNOWN_PARTITION;
        }
    }

2.2 Kafka分区规则

Kafka分区规则图

3. 自定义分区器

3.1 需求

例如我们实现一个分区器实现,发送过来的数据中如果包含年龄大于60,就发往0号分区,小于60,就发往1号分区。方便消费者保存表数据。

3.2 实现步骤

  1. 定义类实现Partitioner接口
  2. 重写partition()方法
java
public class MyPartitioner implements Partitioner {
    /* 返回信息对应的分区
     * @param topic 主题
     * @param key 消息的 key
     * @param keyBytes 消息的 key 序列化后的字节数组
     * @param value 消息的 value
     * @param valueBytes 消息的 value 序列化后的字节数组
     * @param cluster 集群元数据可以查看分区信息
     * @return
     */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        if ("personTable".equals(key)) {
            int age = Integer.parseInt(value.toString());
            if (age >= 60) {
                return 0;
            }
        }
        return 1;
    }

    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> configs) {
        System.out.println(configs);
    }
}
java
public class MyProducer {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 配置
        Properties properties = new Properties();
        // 连接集群
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.105:9092,192.168.101.106:9092,192.168.101.107:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 添加自定义分区器
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.example.part.MyPartitioner");  
        // 1. 创建生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        //2. 发送数据
        Random random = new Random();
        for (int i = 0; i < 100; i++) {
            int i1 = random.nextInt(110);
            kafkaProducer.send(new ProducerRecord<>("hadoop", "personTable", ""+i1));
        }
        //3. 关闭资源
        kafkaProducer.close();
    }
}