Kafka生产者
生产者操作脚本:kafka-console-producer.sh
1. 生产者命令参数
参数 | 描述 |
---|---|
--bootstrap-server <String: server toconnect to> | 连接的 Kafka Broker 主机名称和端口号 |
--topic <String: topic> | 操作的 topic 名称 |
2.生产者发送消息
sh
# 进入交互模式
[jack@hadoop105 kafka-3.6.1]$ ./bin/kafka-console-producer.sh --bootstrap-server hadoop105:9092,hadoop106:9092,hadoop107:9092 --topic hadoop
>I am learning
>
3. 发送原理
在消息发送的过程中,涉及到了两个线程——main线程和Sender线程。在main线程中创建了一个双端队列RecordAccumulator
。main线程将消息发送给RecordAccumulator
,Sender线程不断从RecordAccumulator
中拉取消息发送到Kafka的Broker。
4.生产者重要参数列表
参数名称 | 描述 |
---|---|
bootstrap.servers | 生产者连接集群所需的broker地址清单。 例如hadoop102:9092,hadoop103:9092,hadoop104:9092, 可以设置1个或者多个,中间用逗号隔开。 注意这里并非需要所有的broker地址, 因为生产者从给定的broker里查找到其他broker信息。 |
key.serializer | 指定发送消息的 key 的序列化类型。要写全类名。 |
value.serializer | 指定发送消息的value的序列化类型。要写全类名。 |
buffer.memory | RecordAccumulator缓冲区总大小,默认32m。 |
batch.size | 缓冲区一批数据最大值,默认16k。 适当增加该值,可以提高吞吐量,但是如果该值设置太大, 会导致数据传输延迟增加。 |
linger.ms | 如果数据迟迟未达到batch.size, sender等待linger.time之后就会发送数据。 单位ms,默认值是0ms,表示没有延迟。 生产环境建议该值大小为 5-100ms 之间。 |
acks | 0:生产者发送过来的数据,不需要等数据落盘应答。 1:生产者发送过来的数据,Leader收到数据后应答。 -1(all):生产者发送过来的数据, Leader+和 isr 队列里面的所有节点收齐数据后应答。 默认值是-1,-1和all是等价的。 |
max.in.flight.requests. per.connection | 单个连接(对单个Broker的连接)上 允许的未确认(in-flight)请求的最大数量,默认为5, 开启幂等性要保证该值是 1-5 的数字。 |
retries | 当消息发送出现错误的时候,系统会重发消息。 retries表示重试次数。默认是int最大值2147483647。 如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1 否则在重试此失败消息的时候,其他的消息可能发送成功了。 |
retry.backoff.ms | 两次重试之间的时间间隔,默认是100ms。 |
enable.idempotence | 是否开启幂等性,默认true,开启幂等性。 |
compression.type | 生产者发送的所有数据的压缩方式。 默认是none,也就是不压缩。 支持压缩类型:none、gzip、snappy、lz4和zstd。 |
5. 编码发送消息
5.1 pom.xml加入依赖
xml
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.17.2</version>
</dependency>
5.2 异步不回调
java
public static void main(String[] args) {
// 配置
Properties properties = new Properties();
// 连接集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.105:9092,192.168.101.106:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 1. 创建生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
//2. 发送数据
kafkaProducer.send(new ProducerRecord<String, String>("hadoop", "hi"));
//3. 关闭资源
kafkaProducer.close();
}
信息
如果发生卡顿在close(),很可能是发送消息阻塞了,原因是Zookeeper的元数据里面用的Kafka主机名而不是IP地址以及端口写错,导致找不到Broker接收。
消费者端控制台消费:
5.3 带回调函数的异步发送
回调函数会在Producer收到ack时调用,为异步调用,该方法有两个参数,分别是元数据信息RecordMetadata
和异常信息Exception
,如果Exception
为null,说明消息发送成功,如果Exception
不为null,说明消息发送失败。
提示
消息发送失败会自动重试,不需要我们在回调函数中手动重试。
java
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.105:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
for (int i = 0; i < 10; i++) {
int finalI = i;
kafkaProducer.send(new ProducerRecord<>("hadoop", "msgggg","我是spark"+i), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception==null){
System.out.println("当前"+ finalI +"消息发送完成, 所在分区:"+metadata.partition());
}else{
exception.printStackTrace();
}
}
});
}
kafkaProducer.close();
}
5.4 同步发送
只需在异步发送的基础上,再调用一下 get() 方法即可
java
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.105:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
for (int i = 0; i < 10; i++) {
kafkaProducer.send(new ProducerRecord<>("hadoop", "msgggg","我是spark"+i).get();
}
kafkaProducer.close();
}