Skip to content

Kafka生产者

生产者操作脚本:kafka-console-producer.sh

1. 生产者命令参数

参数描述
--bootstrap-server <String: server toconnect to>连接的 Kafka Broker 主机名称和端口号
--topic <String: topic>操作的 topic 名称

2.生产者发送消息

sh
# 进入交互模式
[jack@hadoop105 kafka-3.6.1]$ ./bin/kafka-console-producer.sh --bootstrap-server hadoop105:9092,hadoop106:9092,hadoop107:9092 --topic hadoop
>I am learning
>

3. 发送原理

在消息发送的过程中,涉及到了两个线程——main线程和Sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka的Broker。 原理图

4.生产者重要参数列表

参数名称描述
bootstrap.servers生产者连接集群所需的broker地址清单。
例如hadoop102:9092,hadoop103:9092,hadoop104:9092,
可以设置1个或者多个,中间用逗号隔开。
注意这里并非需要所有的broker地址,
因为生产者从给定的broker里查找到其他broker信息。
key.serializer指定发送消息的 key 的序列化类型。要写全类名。
value.serializer指定发送消息的value的序列化类型。要写全类名。
buffer.memoryRecordAccumulator缓冲区总大小,默认32m。
batch.size缓冲区一批数据最大值,默认16k。
适当增加该值,可以提高吞吐量,但是如果该值设置太大,
会导致数据传输延迟增加。
linger.ms如果数据迟迟未达到batch.size,
sender等待linger.time之后就会发送数据。
单位ms,默认值是0ms,表示没有延迟。
生产环境建议该值大小为 5-100ms 之间。
acks0:生产者发送过来的数据,不需要等数据落盘应答。
1:生产者发送过来的数据,Leader收到数据后应答。
-1(all):生产者发送过来的数据,
Leader+和 isr 队列里面的所有节点收齐数据后应答。
默认值是-1,-1和all是等价的。
max.in.flight.requests.
per.connection
单个连接(对单个Broker的连接)上
允许的未确认(in-flight)请求的最大数量,默认为5,
开启幂等性要保证该值是 1-5 的数字。
retries当消息发送出现错误的时候,系统会重发消息。
retries表示重试次数。默认是int最大值2147483647。
如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1
否则在重试此失败消息的时候,其他的消息可能发送成功了。
retry.backoff.ms两次重试之间的时间间隔,默认是100ms。
enable.idempotence是否开启幂等性,默认true,开启幂等性。
compression.type生产者发送的所有数据的压缩方式。
默认是none,也就是不压缩。
支持压缩类型:none、gzip、snappy、lz4和zstd。

5. 编码发送消息

5.1 pom.xml加入依赖

xml
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.6.1</version>
</dependency>
<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-slf4j-impl</artifactId>
    <version>2.17.2</version>
</dependency>

5.2 异步不回调

java
 public static void main(String[] args) {
    // 配置
    Properties properties = new Properties();
    // 连接集群
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.105:9092,192.168.101.106:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    // 1. 创建生产者对象
    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    //2. 发送数据
    kafkaProducer.send(new ProducerRecord<String, String>("hadoop", "hi"));
    //3. 关闭资源
    kafkaProducer.close();
}

信息

如果发生卡顿在close(),很可能是发送消息阻塞了,原因是Zookeeper的元数据里面用的Kafka主机名而不是IP地址以及端口写错,导致找不到Broker接收。

消费者端控制台消费: 消费者端成功消费

5.3 带回调函数的异步发送

回调函数会在Producer收到ack时调用,为异步调用,该方法有两个参数,分别是元数据信息RecordMetadata和异常信息Exception,如果Exception为null,说明消息发送成功,如果Exception不为null,说明消息发送失败。

提示

消息发送失败会自动重试,不需要我们在回调函数中手动重试。

java
public static void main(String[] args) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.105:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    for (int i = 0; i < 10; i++) {
        int finalI = i;
        kafkaProducer.send(new ProducerRecord<>("hadoop", "msgggg","我是spark"+i), new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if(exception==null){
                    System.out.println("当前"+ finalI +"消息发送完成, 所在分区:"+metadata.partition());
                }else{
                    exception.printStackTrace();
                }
            }
        });
    }
    kafkaProducer.close();
}

5.4 同步发送

只需在异步发送的基础上,再调用一下 get() 方法即可

java
public static void main(String[] args) throws ExecutionException, InterruptedException {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.105:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    for (int i = 0; i < 10; i++) {
        kafkaProducer.send(new ProducerRecord<>("hadoop", "msgggg","我是spark"+i).get();
    }
    kafkaProducer.close();
}