Skip to content

Kafka消费者优化

1. 消费者事务

1.1 重复消费与漏消费

重复消费:已经消费了数据,但是Offset没提交。
漏消费:先提交Offset后消费,有可能会造成数据的漏消费。 Alt text

1.2 解决方案之消费者事务

先提交Offset还是先消费数据都是问题,因为分别会造成漏消费和重复消费问题,如果想完成Consumer端的精准一次性消费,那么需要Kafka消费端将消费过程和提交Offset过程做原子绑定。仅仅靠双方(提交Offset、消费数据)不能解决问题,需要事务来解决,要么全部失败,要么全部成功。此时我们需要将Kafka的offset保存到支持事务的自定义介质(比如MySQL)。 Alt text 问题与限制:

  1. 数据必须都要放在一个关系型数据库中,无法使用其他功能强大的nosql数据库。
  2. 事务本身性能不好。
  3. 如果保存的数据量较大一个数据库节点不够,多个节点的话,还要考虑分布式事务的问题。分布式事务会带来管理的复杂性,一般企业不选择使用,有的企业会把分布式事务变成本地事务,例如把Executor上的数据通过rdd.collect算子提取到Driver端,由Driver 端统一写入数据库,这样会将分布式事务变成本地事务的单线程操作,降低了写入的吞吐量。

使用场景:数据足够少(通常经过聚合后的数据量都比较小,明细数据一般数据量都比较大),并且支持事务的数据库。

java
static Logger logger = LoggerFactory.getLogger(Test02.class);
    
public static void main(String[] args) {
    
    String topic = "ods_qjk_data";
    String groupId = "default_group";
    // 获取 Kafka 消费者实例
    try (KafkaConsumer<String, byte[]> kafkaConsumer = KafkaUtil.getConsumerInstance(groupId)) {
        Set<TopicPartition> assignment = new HashSet<>();
        // 订阅主题
        kafkaConsumer.subscribe(List.of(topic));
        /**
         * 主动联系Kafka,获取分配给自己的分区信息
         */
        while (assignment.isEmpty()){
            // poll方法的主要目的并非拉取消息,而是触发Kafka进行分区分配操作。
            // 这里代码的确拿了一些Kafka消息,但是此时的消息是没有任何意义的,因为还没有指定offset
            kafkaConsumer.poll(Duration.ofSeconds(1));
            // 获取主题分区信息, 获取分区信息后退出while循环
            assignment = kafkaConsumer.assignment();
        }
        RedissonClient redisClient = RedisUtil.initValidConnection();
        RMap<Integer, Long> map = redisClient.<Integer, Long>getMap(topic+"-"+groupId);

        // 遍历已分配分区信息,并设置每个分区的消费位置
        for (TopicPartition topicPartition : assignment) {
            Long lastOffset = map.get(topicPartition.partition());
            if(lastOffset == null){
                lastOffset = 0L;
            }
            // 指定offset从指定的位置开始消费
            kafkaConsumer.seek(topicPartition, lastOffset);
        }
        while (true) {
            // 真正拉取未消费的消息, 拉取消息超时时间为1秒,一秒内默认最多拿500条消息,默认最多50M大小
            // Kafka客户端底层收到数据负责组装消息
            ConsumerRecords<String, byte[]> records = kafkaConsumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, byte[]> record : records) {
                RTransaction transaction = redisClient.createTransaction(TransactionOptions.defaults());
                // 获取事务中的 RMap 对象
                RMap<Integer, Long> rmap = transaction.<Integer, Long>getMap(topic+"-"+groupId);
                // 获取消息的时间戳
                long ts = record.timestamp();
                int partition = record.partition();
                long offset = record.offset();
                try {
                    logger.debug("正在处理kafka消息:ts: {}, partition:{}, offset: {}", ts, partition, offset);
                    // TODO 业务逻辑
                    // 处理一条消息    记录一条最新offset
                    rmap.put(partition, offset);
                    // 提交事务
                    transaction.commit();
                }catch (Exception e){
                    logger.error("处理kafka消息失败", e);
                    // 出现异常,回滚事务
                    transaction.rollback();
                    break;
                }
            }
        }
    }
}
java
public static void main(String[] args) {
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.102:9092");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StickyAssignor.class.getName());
    //  配置消费者组(组名任意起名)
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test999");
    // 是否自动提交 offset
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 默认就是true
    // 提交 offset 的时间周期 1000ms,默认 5s
    properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);// 设置1s

    Set<TopicPartition> assignment = new HashSet<>();
    // 创建消费者
    try (KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties)) {
        List<String> topics = Collections.singletonList("sparkTopic");
        kafkaConsumer.subscribe(topics);
        while (assignment.isEmpty()){
            kafkaConsumer.poll(Duration.ofSeconds(1));
            // 获取消费者分区分配信息(有了分区分配信息才能开始消费)
            assignment = kafkaConsumer.assignment();
        }
        long offset = DBUtil.getMaxOffset("sparkTopic")
        // 遍历所有分区,并指定offset从指定的位置开始消费
        for (TopicPartition topicPartition : assignment) {
            kafkaConsumer.seek(topicPartition, offset);
        }
        // 拉取数据打印
        while (true) {
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(2));
            for (ConsumerRecord<String, String> msg : consumerRecords) {
                System.out.println("偏移量: " + msg.offset() + "\t分区:" + msg.partition() + "\t消息key: " + msg.key() + "\t消息内容: " + msg.value());
                DbUtil.startTranstion();
                // 数据表上新增一列offset, 记录消费位置
                DBUtil.insertData(msg.value(), msg.offset());
                // 提交 offset
                kafkaConsumer.commitSync();
                DbUtil.commit();
            }
        }
    }
}
java

1.3 解决方案之后置提交Offset+幂等方案

首先解决数据丢失问题,办法就是要等数据保存成功后再提交偏移量,所以就必须手工来控制偏移量的提交时机。保证了at least once(数据至少消费一次),把数据的保存做成幂等性保存,即同一批数据反复保存多次,数据不会翻倍,保存一次和保存一百次的效果是一样的。
使用场景:处理数据较多,或者数据保存在不支持事务的数据库上。

2. 消费者提高吞吐量

Alt text

参数名称描述
fetch.max.bytes默认 Default: 52428800(50M)。
消费者获取服务器端一批消息最大的字节数。
如果服务器端一批次的数据大于该值(50M)仍然可以拉取回来这批数据,
因此这不是一个绝对最大值。
一批次的大小受message.max.bytes max.message.bytes共同影响。
max.poll.records一次 poll拉取数据返回消息的最大条数,默认是500条