消费者 API
1. 独立消费者订阅主题
需求: 创建一个独立消费者,消费first主题中数据。
重要提示
编码调用消费者API代码中必须配置指定消费者组id。命令行脚本kafka-console-consumer.sh
启动消费者可以不填写消费者组id是因为会被自动生成随机的消费者组id。
代码实现:
java
public class CustomConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.105:9092");
// 配置反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 配置消费者组(组名任意起名)
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
// 创建消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
List<String> topics = Collections.singletonList("hadoop");
kafkaConsumer.subscribe(topics);
// 拉取数据打印
while (true){
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(2));
for (ConsumerRecord<String, String> msg : consumerRecords) {
System.out.println("偏移量: "+ msg.offset()+"\t分区:"+msg.partition()+"\t消息key: "+msg.key()+"\t消息内容: "+msg.value());
}
}
}
}
在Kafka集群控制台,创建Kafka生产者,并输入数据。
sh
[jack@hadoop101 kafka-3.6.1]$ ./bin/kafka-console-producer.sh --topic hadoop --bootstrap-server 192.168.101.105:9092
>hello
>I am ok
>zhongguo
>中国
>这是一个测试消息
运行结果:
2. 独立消费者订阅分区
需求:创建一个独立消费者,消费testtopic主题0号分区的数据。 编码实现:
java
public class CustomConsumerPart {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.105:9092");
// 配置反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 配置消费者组(组名任意起名)
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test123");
// 创建消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
List<TopicPartition> topics = Collections.singletonList(new TopicPartition("testtopic", 0));
kafkaConsumer.assign(topics);
// 拉取数据打印
while (true){
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(2));
for (ConsumerRecord<String, String> msg : consumerRecords) {
System.out.println("偏移量: "+ msg.offset()+"\t分区:"+msg.partition()+"\t消息key: "+msg.key()+"\t消息内容: "+msg.value());
}
}
}
}
Kafka 生产者指定往testtopic主题分区0推送消息
java
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 配置
Properties properties = new Properties();
// 连接集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.105:9092,192.168.101.106:9092,192.168.101.107:9092");
// 配置序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.example.producer.part.MyPartitioner");
// 1. 创建生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
//2. 发送数据
Random random = new Random();
for (int i = 0; i < 100; i++) {
int i1 = random.nextInt(110);
// 指定往分区 0 生产消息
kafkaProducer.send(new ProducerRecord<>("testtopic", 0, "user", "id:"+i1));
}
//3. 关闭资源
kafkaProducer.close();
}
运行结果:
3. 消费者组订阅主题
需求:测试同一个主题的分区数据,由一个消费者组消费。 代码实操:
Kafka消费者组test345去订阅主题testtopic
java
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.105:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 配置消费者组(组名任意起名)
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test345");
// 创建消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
List<String> topics = Collections.singletonList("testtopic");
kafkaConsumer.subscribe(topics);
// 拉取数据打印
while (true){
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(2));
for (ConsumerRecord<String, String> msg : consumerRecords) {
System.out.println("偏移量: "+ msg.offset()+"\t分区:"+msg.partition()+"\t消息key: "+msg.key()+"\t消息内容: "+msg.value());
}
}
}
java
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.105:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 配置消费者组(组名任意起名)
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test345");
// 创建消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
List<String> topics = Collections.singletonList("testtopic");
kafkaConsumer.subscribe(topics);
// 拉取数据打印
while (true){
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(2));
for (ConsumerRecord<String, String> msg : consumerRecords) {
System.out.println("偏移量: "+ msg.offset()+"\t分区:"+msg.partition()+"\t消息key: "+msg.key()+"\t消息内容: "+msg.value());
}
}
}
依次启动CustomGroup1Consumer、CustomGroupConsumer消费者,在启动生产者CustomProducer
java
public class CustomProducer {
public static void main(String[] args) throws InterruptedException {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.105:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
for (int i = 0; i < 500; i++) {
int finalI = i;
// 指定key值有助于打散消息到不同分区
kafkaProducer.send(new ProducerRecord<>("testtopic",null, i+"", "我是spark"+i), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception==null){
System.out.println("当前"+ finalI +"消息发送完成, 所在分区:"+metadata.partition());
}else{
exception.printStackTrace();
}
}
});
Thread.sleep(10);
}
kafkaProducer.close();
}
}
运行结果: 可以观察到Kafka消费者组中的消费者上线过后会被指定消费指定的一些分区的数据。