Kafka消费者优化
1. 消费者事务
1.1 重复消费与漏消费
重复消费:已经消费了数据,但是Offset没提交。
漏消费:先提交Offset后消费,有可能会造成数据的漏消费。
1.2 解决方案之消费者事务
先提交Offset还是先消费数据都是问题,因为分别会造成漏消费和重复消费问题,如果想完成Consumer端的精准一次性消费,那么需要Kafka消费端将消费过程和提交Offset过程做原子绑定。仅仅靠双方(提交Offset、消费数据)不能解决问题,需要事务来解决,要么全部失败,要么全部成功。此时我们需要将Kafka的offset保存到支持事务的自定义介质(比如MySQL)。 问题与限制:
- 数据必须都要放在一个关系型数据库中,无法使用其他功能强大的nosql数据库。
- 事务本身性能不好。
- 如果保存的数据量较大一个数据库节点不够,多个节点的话,还要考虑分布式事务的问题。分布式事务会带来管理的复杂性,一般企业不选择使用,有的企业会把分布式事务变成本地事务,例如把Executor上的数据通过rdd.collect算子提取到Driver端,由Driver 端统一写入数据库,这样会将分布式事务变成本地事务的单线程操作,降低了写入的吞吐量。
使用场景:
数据足够少(通常经过聚合后的数据量都比较小,明细数据一般数据量都比较大),并且支持事务的数据库。
java
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.102:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StickyAssignor.class.getName());
// 配置消费者组(组名任意起名)
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test999");
// 是否自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 默认就是true
// 提交 offset 的时间周期 1000ms,默认 5s
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);// 设置1s
Set<TopicPartition> assignment = new HashSet<>();
// 创建消费者
try (KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties)) {
List<String> topics = Collections.singletonList("sparkTopic");
kafkaConsumer.subscribe(topics);
while (assignment.isEmpty()){
kafkaConsumer.poll(Duration.ofSeconds(1));
// 获取消费者分区分配信息(有了分区分配信息才能开始消费)
assignment = kafkaConsumer.assignment();
}
long offset = DBUtil.getMaxOffset()
// 遍历所有分区,并指定offset从指定的位置开始消费
for (TopicPartition topicPartition : assignment) {
kafkaConsumer.seek(topicPartition, offset);
}
// 拉取数据打印
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(2));
for (ConsumerRecord<String, String> msg : consumerRecords) {
System.out.println("偏移量: " + msg.offset() + "\t分区:" + msg.partition() + "\t消息key: " + msg.key() + "\t消息内容: " + msg.value());
DbUtil.startTranstion();
// 数据表上新增一列offset, 记录消费位置
DBUtil.insertData(msg.value(), msg.offset());
// 提交 offset
kafkaConsumer.commitSync();
DbUtil.commit();
}
}
}
}
1.3 解决方案之后置提交Offset+幂等方案
首先解决数据丢失问题,办法就是要等数据保存成功后再提交偏移量,所以就必须手工来控制偏移量的提交时机。保证了at least once(数据至少消费一次),把数据的保存做成幂等性保存,即同一批数据反复保存多次,数据不会翻倍,保存一次和保存一百次的效果是一样的。
使用场景 处理数据较多,或者数据保存在不支持事务的数据库上。
2. 消费者提高吞吐量
参数名称 | 描述 |
---|---|
fetch.max.bytes | 默认 Default: 52428800(50M)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50M)仍然可以拉取回来这批数据,因此这不是一个绝对最大值。一批次的大小受message.max.bytes (broker config)ormax.message.bytes (topic config)影响。 |
max.poll.records | 一次 poll 拉取数据返回消息的最大条数,默认是500条 |