Skip to content

消费者 API

1. 独立消费者订阅主题

需求: 创建一个独立消费者,消费first主题中数据。 Alt text

重要提示

编码调用消费者API代码中必须配置指定消费者组id。命令行脚本kafka-console-consumer.sh启动消费者可以不填写消费者组id是因为会被自动生成随机的消费者组id。

代码实现:

java
public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.105:9092");
        // 配置反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //  配置消费者组(组名任意起名)
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

        // 创建消费者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
        List<String> topics = Collections.singletonList("hadoop");
        kafkaConsumer.subscribe(topics);
        // 拉取数据打印
        while (true){
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(2));
            for (ConsumerRecord<String, String> msg : consumerRecords) {
                System.out.println("偏移量: "+ msg.offset()+"\t分区:"+msg.partition()+"\t消息key: "+msg.key()+"\t消息内容: "+msg.value());
            }
        }
    }
}

在Kafka集群控制台,创建Kafka生产者,并输入数据。

sh
[jack@hadoop101 kafka-3.6.1]$ ./bin/kafka-console-producer.sh --topic hadoop --bootstrap-server 192.168.101.105:9092
>hello
>I am ok
>zhongguo
>中国
>这是一个测试消息

运行结果: 打印消费信息图

2. 独立消费者订阅分区

需求:创建一个独立消费者,消费testtopic主题0号分区的数据。 Alt text 编码实现:

java
public class CustomConsumerPart {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.105:9092");
        // 配置反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //  配置消费者组(组名任意起名)
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test123");

        // 创建消费者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
        List<TopicPartition> topics = Collections.singletonList(new TopicPartition("testtopic", 0));
        kafkaConsumer.assign(topics);
        // 拉取数据打印
        while (true){
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(2));
            for (ConsumerRecord<String, String> msg : consumerRecords) {
                System.out.println("偏移量: "+ msg.offset()+"\t分区:"+msg.partition()+"\t消息key: "+msg.key()+"\t消息内容: "+msg.value());
            }
        }
    }
}

Kafka 生产者指定往testtopic主题分区0推送消息

java
public static void main(String[] args) throws ExecutionException, InterruptedException {
    // 配置
    Properties properties = new Properties();
    // 连接集群
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.105:9092,192.168.101.106:9092,192.168.101.107:9092");
    // 配置序列化
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.example.producer.part.MyPartitioner");
    // 1. 创建生产者对象
    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    //2. 发送数据
    Random random = new Random();
    for (int i = 0; i < 100; i++) {
        int i1 = random.nextInt(110);
        // 指定往分区 0 生产消息
        kafkaProducer.send(new ProducerRecord<>("testtopic", 0, "user", "id:"+i1));
    }
    //3. 关闭资源
    kafkaProducer.close();
}

运行结果:
Alt text

3. 消费者组订阅主题

需求:测试同一个主题的分区数据,由一个消费者组消费。 代码实操:
Kafka消费者组test345去订阅主题testtopic

java
public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.105:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //  配置消费者组(组名任意起名)
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test345");

        // 创建消费者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
        List<String> topics = Collections.singletonList("testtopic");
        kafkaConsumer.subscribe(topics);
        // 拉取数据打印
        while (true){
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(2));
            for (ConsumerRecord<String, String> msg : consumerRecords) {
                System.out.println("偏移量: "+ msg.offset()+"\t分区:"+msg.partition()+"\t消息key: "+msg.key()+"\t消息内容: "+msg.value());
            }
        }
    }
java
public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.105:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //  配置消费者组(组名任意起名)
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test345");

        // 创建消费者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
        List<String> topics = Collections.singletonList("testtopic");
        kafkaConsumer.subscribe(topics);
        // 拉取数据打印
        while (true){
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(2));
            for (ConsumerRecord<String, String> msg : consumerRecords) {
                System.out.println("偏移量: "+ msg.offset()+"\t分区:"+msg.partition()+"\t消息key: "+msg.key()+"\t消息内容: "+msg.value());
            }
        }
    }

依次启动CustomGroup1Consumer、CustomGroupConsumer消费者,在启动生产者CustomProducer

java
public class CustomProducer {

    public static void main(String[] args) throws InterruptedException {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.105:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        for (int i = 0; i < 500; i++) {
            int finalI = i;
            // 指定key值有助于打散消息到不同分区
            kafkaProducer.send(new ProducerRecord<>("testtopic",null, i+"", "我是spark"+i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if(exception==null){
                        System.out.println("当前"+ finalI +"消息发送完成, 所在分区:"+metadata.partition());
                    }else{
                        exception.printStackTrace();
                    }
                }
            });
            Thread.sleep(10);
        }
        kafkaProducer.close();
    }
}

运行结果: Alt text 可以观察到Kafka消费者组中的消费者上线过后会被指定消费指定的一些分区的数据。