Skip to content

Kafka生产者源码

1. 初始化

1.1 生产者main线程初始化

Alt text 点击 main()方法中的 KafkaProducer()

java
public KafkaProducer(Properties properties) {
    this(properties, null, null);
}

public KafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
    this(Utils.propsToMap(properties), keySerializer, valueSerializer);
}

public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
    this(new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, keySerializer, valueSerializer)),
            keySerializer, valueSerializer, null, null, null, Time.SYSTEM);
}

跳转到 KafkaProducer 构造方法。

java
public KafkaProducer(Properties properties) {
    this(properties, null, null);
}

public KafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
    this(Utils.propsToMap(properties), keySerializer, valueSerializer);
}

跳转到KafkaProducer构造方法

java
KafkaProducer(ProducerConfig config,
                  Serializer<K> keySerializer,
                  Serializer<V> valueSerializer,
                  ProducerMetadata metadata,
                  KafkaClient kafkaClient,
                  ProducerInterceptors<K, V> interceptors,
                  Time time) {
    try {
        this.producerConfig = config;
        this.time = time;
        // 获取事务 id
        String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
        // 获取客户端 id
        this.clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);

        LogContext logContext;
        if (transactionalId == null)
            logContext = new LogContext(String.format("[Producer clientId=%s] ", clientId));
        else
            logContext = new LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ", clientId, transactionalId));
        log = logContext.logger(KafkaProducer.class);
        log.trace("Starting the Kafka producer");

        Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
        MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
                .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
                .recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
                .tags(metricTags);
        List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config);
        // 监控相关配置
        MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,
                config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
        this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);
        this.producerMetrics = new KafkaProducerMetrics(metrics);
        // 分区器配置
        this.partitioner = config.getConfiguredInstance(
                ProducerConfig.PARTITIONER_CLASS_CONFIG,
                Partitioner.class,
                Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
        warnIfPartitionerDeprecated();
        this.partitionerIgnoreKeys = config.getBoolean(ProducerConfig.PARTITIONER_IGNORE_KEYS_CONFIG);
        // 重试时间间隔参数配置,默认值 100ms
        long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
        // 序列化配置
        if (keySerializer == null) {
            this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                                                                                        Serializer.class);
            this.keySerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), true);
        } else {
            config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
            this.keySerializer = keySerializer;
        }
        if (valueSerializer == null) {
            this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                                                                                        Serializer.class);
            this.valueSerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), false);
        } else {
            config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
            this.valueSerializer = valueSerializer;
        }
        // 拦截器配置
        List<ProducerInterceptor<K, V>> interceptorList = ClientUtils.createConfiguredInterceptors(config,
                ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                ProducerInterceptor.class);
        if (interceptors != null)
            this.interceptors = interceptors;
        else
            this.interceptors = new ProducerInterceptors<>(interceptorList);
        ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(this.keySerializer,
                this.valueSerializer, interceptorList, reporters);
        // 生产者发往 Kafka 集群单条信息的最大值,默认 1m
        this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
        // 缓存大小,默认 32m
        this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
        // 压缩配置,默认 none
        this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));

        this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
        int deliveryTimeoutMs = configureDeliveryTimeout(config, log);

        this.apiVersions = new ApiVersions();
        this.transactionManager = configureTransactionState(config, logContext);
        // There is no need to do work required for adaptive partitioning, if we use a custom partitioner.
        boolean enableAdaptivePartitioning = partitioner == null &&
            config.getBoolean(ProducerConfig.PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG);
        RecordAccumulator.PartitionerConfig partitionerConfig = new RecordAccumulator.PartitionerConfig(
            enableAdaptivePartitioning,
            config.getLong(ProducerConfig.PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG)
        );
        // As per Kafka producer configuration documentation batch.size may be set to 0 to explicitly disable
        // batching which in practice actually means using a batch size of 1.
        int batchSize = Math.max(1, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG));
        // 上下文环境
        // 批次大下,默认 16k
        // 是否压缩,默认 none
        // linger.ms,默认值 0。
        // 重试间隔时间,默认值 100ms。
        // delivery.timeout.ms 默认值 2 分钟。
        // request.timeout.ms 默认值 30s。
        this.accumulator = new RecordAccumulator(logContext,
                batchSize,
                this.compressionType,
                lingerMs(config),
                retryBackoffMs,
                deliveryTimeoutMs,
                partitionerConfig,
                metrics,
                PRODUCER_METRIC_GROUP_NAME,
                time,
                apiVersions,
                transactionManager,
                new BufferPool(this.totalMemorySize, batchSize, metrics, time, PRODUCER_METRIC_GROUP_NAME));
        // Kafka 集群地址
        List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config);
        // 从 Kafka 集群获取元数据
        if (metadata != null) {
            this.metadata = metadata;
        } else {
            // metadata.max.age.ms 默认值 5 分钟。生产者每隔多久需要更新一下自己的元数据
            // metadata.max.idle.ms 默认值 5 分钟。网络最多空闲时间设置,超过该阈值,就关闭该网络
            this.metadata = new ProducerMetadata(retryBackoffMs,
                    config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
                    config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG),
                    logContext,
                    clusterResourceListeners,
                    Time.SYSTEM);
            this.metadata.bootstrap(addresses);
        }
        this.errors = this.metrics.sensor("errors");
        // 初始化 sender 线程
        this.sender = newSender(logContext, kafkaClient, this.metadata);
        String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
        // 启动发送线程
        this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
        this.ioThread.start();
        config.logUnused();
        AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
        log.debug("Kafka producer started");
    } catch (Throwable t) {
        // call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
        close(Duration.ofMillis(0), true);
        // now propagate the exception
        throw new KafkaException("Failed to construct kafka producer", t);
    }
}

1.2 生产者sender线程初始化

Alt text 点击 newSender()方法,查看发送线程初始化。
KafkaProducer.java

java
Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
    // 缓存的发送请求,默认值是 5。
    int maxInflightRequests = producerConfig.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
    // request.timeout.ms 默认值 30s
    int requestTimeoutMs = producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
    ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
    Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
    // maxInflightRequests 缓存的发送请求,默认值是 5。
    // reconnect.backoff.ms 默认值 50ms。重试时间间隔
    // reconnect.backoff.max.ms 默认值 1000ms。重试的总时间。每次重试失败时,呈指数增加重试时间,直至达到此最大值
    // send.buffer.bytes 默认值 128k。socket 发送数据的缓冲区大小
    // receive.buffer.bytes 默认值 32k。socket 接收数据的缓冲区大小
    // request.timeout.ms 默认值 30s。
    // socket.connection.setup.timeout.ms 默认值 10s。生产者和服务器通信连接建立的时间。如果在超时之前没有建立连接,将关闭通信。
    // socket.connection.setup.timeout.max.ms 默认值 30s。生产者和服务器通信,每次连续连接失败时,连接建立超时将呈指数增加,直至达到此最大值。
    KafkaClient client = kafkaClient != null ? kafkaClient : ClientUtils.createNetworkClient(producerConfig,
            this.metrics,
            "producer",
            logContext,
            apiVersions,
            time,
            maxInflightRequests,
            metadata,
            throttleTimeSensor);
    // acks 默认值是-1。
    // acks=0, 生产者发送给 Kafka 服务器后,不需要应答
    // acks=1,生产者发送给 Kafka 服务器后,Leader 接收后应答
    // acks=-1(all),生产者发送给 Kafka 服务器后,Leader 和在 ISR 队列的所有 Follower 共同应答
    short acks = Short.parseShort(producerConfig.getString(ProducerConfig.ACKS_CONFIG));
    // max.request.size 默认值 1m。生产者发往 Kafka 集群单条信息的最大值
    // retries 重试次数,默认值 Int 的最大值
    // retry.backoff.ms 默认值 100ms。重试时间间隔
    return new Sender(logContext,
            client,
            metadata,
            this.accumulator,
            maxInflightRequests == 1,
            producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
            acks,
            producerConfig.getInt(ProducerConfig.RETRIES_CONFIG),
            metricsRegistry.senderMetrics,
            time,
            requestTimeoutMs,
            producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
            this.transactionManager,
            apiVersions);
}

Sender 对象被放到了一个线程中启动,所有需要点击 newSender()方法中的 Sender,并找到 sender 对象中的 run()方法。

java
public void run() {
    log.debug("Starting Kafka producer I/O thread.");

    if (transactionManager != null)
        transactionManager.setPoisonStateOnInvalidTransition(true);

    // main loop, runs until close is called
    while (running) {
        try {
            // sender 线程从缓冲区准备拉取数据,刚启动拉不到数据
            runOnce();
        } catch (Exception e) {
            log.error("Uncaught error in kafka producer I/O thread: ", e);
        }
    }
......
 }

2. 发送数据到缓冲区

Alt text

3. 发送总体流程

点击自己编写的 CustomProducer.java中的send()方法。 KafkaProducer.java

java
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
    // intercept the record, which can be potentially modified; this method does not throw exceptions
    // 拦截器处理发送的数据
    ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
    return doSend(interceptedRecord, callback);
}

点击onSend()方法,进行拦截器相关处理。 ProducerInterceptors.java

java
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
    ProducerRecord<K, V> interceptRecord = record;
    for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
        try {
            // 拦截器处理
            interceptRecord = interceptor.onSend(interceptRecord);
        } catch (Exception e) {
            // do not propagate interceptor exception, log and continue calling other interceptors
            // be careful not to throw exception from here
            if (record != null)
                log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e);
            else
                log.warn("Error executing interceptor onSend callback", e);
        }
    }
    return interceptRecord;
}

从拦截器处理中返回,点击doSend()方法。 KafkaProducer.java

java
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
    // Append callback takes care of the following:
    //  - call interceptors and user callback on completion
    //  - remember partition that is calculated in RecordAccumulator.append
    AppendCallbacks appendCallbacks = new AppendCallbacks(callback, this.interceptors, record);

    try {
        throwIfProducerClosed();
        // first make sure the metadata for the topic is available
        long nowMs = time.milliseconds();
        ClusterAndWaitTime clusterAndWaitTime;
        try {
            // 从 Kafka 拉取元数据。maxBlockTimeMs 表示最多能等待多长时间。
            clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
        } catch (KafkaException e) {
            if (metadata.isClosed())
                throw new KafkaException("Producer closed while send in progress", e);
            throw e;
        }
        nowMs += clusterAndWaitTime.waitedOnMetadataMs;
        // 剩余时间 = 最多能等待时间 - 用了多少时间;
        long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
        // 更新集群元数据
        Cluster cluster = clusterAndWaitTime.cluster;
        // 序列化操作
        byte[] serializedKey;
        try {
            serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
        } catch (ClassCastException cce) {
            throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                    " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                    " specified in key.serializer", cce);
        }
        byte[] serializedValue;
        try {
            serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
        } catch (ClassCastException cce) {
            throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                    " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                    " specified in value.serializer", cce);
        }

        // Try to calculate partition, but note that after this call it can be RecordMetadata.UNKNOWN_PARTITION,
        // which means that the RecordAccumulator would pick a partition using built-in logic (which may
        // take into account broker load, the amount of data produced to each partition, etc.).
        // 分区操作(根据元数据信息)
        int partition = partition(record, serializedKey, serializedValue, cluster);

        setReadOnly(record.headers());
        Header[] headers = record.headers().toArray();

        int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
                compressionType, serializedKey, serializedValue, headers);
        // 校验发送消息的大小是否超过最大值,默认是 1m
        ensureValidRecordSize(serializedSize);
        long timestamp = record.timestamp() == null ? nowMs : record.timestamp();

        // A custom partitioner may take advantage on the onNewBatch callback.
        boolean abortOnNewBatch = partitioner != null;

        // Append the record to the accumulator.  Note, that the actual partition may be
        // calculated there and can be accessed via appendCallbacks.topicPartition.
        // 消息发送的回调函数
        // 内存,默认 32m,里面是默认 16k 一个批次
        RecordAccumulator.RecordAppendResult result = accumulator.append(record.topic(), partition, timestamp, serializedKey,
                serializedValue, headers, appendCallbacks, remainingWaitMs, abortOnNewBatch, nowMs, cluster);
        assert appendCallbacks.getPartition() != RecordMetadata.UNKNOWN_PARTITION;

        if (result.abortForNewBatch) {
            int prevPartition = partition;
            onNewBatch(record.topic(), cluster, prevPartition);

            partition = partition(record, serializedKey, serializedValue, cluster);
            if (log.isTraceEnabled()) {
                log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
            }

            result = accumulator.append(record.topic(), partition, timestamp, serializedKey,
                serializedValue, headers, appendCallbacks, remainingWaitMs, false, nowMs, cluster);
        }

        // Add the partition to the transaction (if in progress) after it has been successfully
        // appended to the accumulator. We cannot do it before because the partition may be
        // unknown or the initially selected partition may be changed when the batch is closed
        // (as indicated by `abortForNewBatch`). Note that the `Sender` will refuse to dequeue
        // batches from the accumulator until they have been added to the transaction.
        if (transactionManager != null) {
            transactionManager.maybeAddPartition(appendCallbacks.topicPartition());
        }
        // 批次满了 或者 创建了一个新的批次,唤醒 sender 发送线程
        if (result.batchIsFull || result.newBatchCreated) {
            log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), appendCallbacks.getPartition());
            this.sender.wakeup();
        }
        return result.future;
        // handling exceptions and record the errors;
        // for API exceptions return them in the future,
        // for other exceptions throw directly
    } catch (ApiException e) {
        .......
    }
}

4. 分区选择

KafkaProducer.java
详解默认分区规则:

java
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
    // 指定了分区,那就直接用该分区号
    if (record.partition() != null)
        return record.partition();
    // 分区器选择分区
    if (partitioner != null) {
        int customPartition = partitioner.partition(
            record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
        if (customPartition < 0) {
            throw new IllegalArgumentException(String.format(
                "The partitioner generated an invalid partition number: %d. Partition number should always be non-negative.", customPartition));
        }
        return customPartition;
    }

    if (serializedKey != null && !partitionerIgnoreKeys) {
        // hash the keyBytes to choose a partition
        return BuiltInPartitioner.partitionForKey(serializedKey, cluster.partitionsForTopic(record.topic()).size());
    } else {
        // 没有指定 key
        return RecordMetadata.UNKNOWN_PARTITION;
    }
}

没有指定key,没有指定分区 KafkaProducer.java

java
// 内存,默认 32m,里面是默认 16k 一个批次
RecordAccumulator.RecordAppendResult result = accumulator.append(record.topic(), partition, timestamp, serializedKey,
        serializedValue, headers, appendCallbacks, remainingWaitMs, abortOnNewBatch, nowMs, cluster);

RecordAccumulator.java

java
public RecordAppendResult append(String topic,
                                     int partition,
                                     long timestamp,
                                     byte[] key,
                                     byte[] value,
                                     Header[] headers,
                                     AppendCallbacks callbacks,
                                     long maxTimeToBlock,
                                     boolean abortOnNewBatch,
                                     long nowMs,
                                     Cluster cluster) throws InterruptedException {
    TopicInfo topicInfo = topicInfoMap.computeIfAbsent(topic, k -> new TopicInfo(logContext, k, batchSize));

    // We keep track of the number of appending thread to make sure we do not miss batches in
    // abortIncompleteBatches().
    appendsInProgress.incrementAndGet();
    ByteBuffer buffer = null;
    if (headers == null) headers = Record.EMPTY_HEADERS;
    try {
        // Loop to retry in case we encounter partitioner's race conditions.
        while (true) {
            // If the message doesn't have any partition affinity, so we pick a partition based on the broker
            // availability and performance.  Note, that here we peek current partition before we hold the
            // deque lock, so we'll need to make sure that it's not changed while we were waiting for the
            // deque lock.
            final BuiltInPartitioner.StickyPartitionInfo partitionInfo;
            final int effectivePartition;
            // 没有指定key,没有指定分区,使用粘性分区规则
            if (partition == RecordMetadata.UNKNOWN_PARTITION) {
                partitionInfo = topicInfo.builtInPartitioner.peekCurrentPartitionInfo(cluster);
                effectivePartition = partitionInfo.partition();
            } else {
                partitionInfo = null;
                effectivePartition = partition;
            }

5. 发送消息大小校验

KafkaProducer.java 详解缓冲区大小

java
private void ensureValidRecordSize(int size) {
    // 一次请求获取消息的最大值,默认是 1m
    if (size > maxRequestSize)
        throw new RecordTooLargeException("The message is " + size +
                " bytes when serialized which is larger than " + maxRequestSize + ", which is the value of the " +
                ProducerConfig.MAX_REQUEST_SIZE_CONFIG + " configuration.");
    // 缓冲区内存总大小,默认 32m
    if (size > totalMemorySize)
        throw new RecordTooLargeException("The message is " + size +
                " bytes when serialized which is larger than the total memory buffer you have configured with the " +
                ProducerConfig.BUFFER_MEMORY_CONFIG +
                " configuration.");
}

6. 内存池

KafkaProducer.java 详解内存池。

java
public RecordAppendResult append(String topic,
                                     int partition,
                                     long timestamp,
                                     byte[] key,
                                     byte[] value,
                                     Header[] headers,
                                     AppendCallbacks callbacks,
                                     long maxTimeToBlock,
                                     boolean abortOnNewBatch,
                                     long nowMs,
                                     Cluster cluster) throws InterruptedException {
    TopicInfo topicInfo = topicInfoMap.computeIfAbsent(topic, k -> new TopicInfo(logContext, k, batchSize));

    // We keep track of the number of appending thread to make sure we do not miss batches in
    // abortIncompleteBatches().
    appendsInProgress.incrementAndGet();
    ByteBuffer buffer = null;
    if (headers == null) headers = Record.EMPTY_HEADERS;
    try {
        // Loop to retry in case we encounter partitioner's race conditions.
        while (true) {
            // If the message doesn't have any partition affinity, so we pick a partition based on the broker
            // availability and performance.  Note, that here we peek current partition before we hold the
            // deque lock, so we'll need to make sure that it's not changed while we were waiting for the
            // deque lock.
            final BuiltInPartitioner.StickyPartitionInfo partitionInfo;
            final int effectivePartition;
            // 没有指定key,没有指定分区,使用粘性分区规则
            if (partition == RecordMetadata.UNKNOWN_PARTITION) {
                partitionInfo = topicInfo.builtInPartitioner.peekCurrentPartitionInfo(cluster);
                effectivePartition = partitionInfo.partition();
            } else {
                partitionInfo = null;
                effectivePartition = partition;
            }

            // Now that we know the effective partition, let the caller know.
            setPartition(callbacks, effectivePartition);

            // check if we have an in-progress batch
            // 每个分区,创建或者获取一个队列
            Deque<ProducerBatch> dq = topicInfo.batches.computeIfAbsent(effectivePartition, k -> new ArrayDeque<>());
            synchronized (dq) {
                // After taking the lock, validate that the partition hasn't changed and retry.
                if (partitionChanged(topic, topicInfo, partitionInfo, dq, nowMs, cluster))
                    continue;
                // 尝试向队列里面添加数据(没有分配内存、批次对象,所以失败)
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs);
                if (appendResult != null) {
                    // If queue has incomplete batches we disable switch (see comments in updatePartitionInfo).
                    boolean enableSwitch = allBatchesFull(dq);
                    topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);
                    return appendResult;
                }
            }

            // we don't have an in-progress record batch try to allocate a new batch
            if (abortOnNewBatch) {
                // Return a result that will cause another call to append.
                return new RecordAppendResult(null, false, false, true, 0);
            }

            if (buffer == null) {
                byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
                // 取批次大小(默认 16k)和消息大小的最大值(上限默认 1m)。这样设计的主要原因是有可能一条消息的大小大于批次大小。
                int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
                log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, topic, partition, maxTimeToBlock);
                // This call may block if we exhausted buffer space.
                // 根据批次大小(默认 16k)和消息大小中最大值,分配内存
                buffer = free.allocate(size, maxTimeToBlock);
                // Update the current time in case the buffer allocation blocked above.
                // NOTE: getting time may be expensive, so calling it under a lock
                // should be avoided.
                nowMs = time.milliseconds();
            }

            synchronized (dq) {
                // After taking the lock, validate that the partition hasn't changed and retry.
                if (partitionChanged(topic, topicInfo, partitionInfo, dq, nowMs, cluster))
                    continue;
                // 根据内存大小封装批次(有内存、有批次对象)
                RecordAppendResult appendResult = appendNewBatch(topic, effectivePartition, dq, timestamp, key, value, headers, callbacks, buffer, nowMs);
                // Set buffer to null, so that deallocate doesn't return it back to free pool, since it's used in the batch.
                if (appendResult.newBatchCreated)
                    buffer = null;
                // If queue has incomplete batches we disable switch (see comments in updatePartitionInfo).
                boolean enableSwitch = allBatchesFull(dq);
                
                topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);
                return appendResult;
            }
        }
    } finally {
        // 如果发生异常,释放内存
        free.deallocate(buffer);
        appendsInProgress.decrementAndGet();
    }
}

7. sender 线程发送数据

Alt text KafkaProducer.java 详解发送线程。

java
 // 批次满了 或者 创建了一个新的批次,唤醒 sender 发送线程
if (result.batchIsFull || result.newBatchCreated) {
    log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), appendCallbacks.getPartition());
    this.sender.wakeup();
}

进入 sender 发送线程的 run()方法。

java
public void run() {
    log.debug("Starting Kafka producer I/O thread.");

    if (transactionManager != null)
        transactionManager.setPoisonStateOnInvalidTransition(true);

    // main loop, runs until close is called
    while (running) {
        try {
            // sender 线程从缓冲区准备拉取数据,刚启动拉不到数据
            runOnce();
        } catch (Exception e) {
            log.error("Uncaught error in kafka producer I/O thread: ", e);
        }
    }
......
}

void runOnce() {
    // 如果是事务操作,按照如下处理
    if (transactionManager != null) {
    ......
    }
long currentTimeMs = time.milliseconds();
// 将准备好的数据发送到服务器端
long pollTimeout = sendProducerData(currentTimeMs);
// 等待发送响应
client.poll(pollTimeout, currentTimeMs);

private long sendProducerData(long now) {
    // get the list of partitions with data ready to send
    // 1、检查 32m 缓存是否准备好(linger.ms)
    RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(metadata, now);

    // if there are any partitions whose leaders are not known yet, force metadata update
    // 如果 Leader 信息不知道,是不能发送数据的
    if (!result.unknownLeaderTopics.isEmpty()) {
        // The set of topics with unknown leader contains topics with leader election pending as well as
        // topics which may have expired. Add the topic again to metadata to ensure it is included
        // and request metadata update, since there are messages to send to the topic.
        for (String topic : result.unknownLeaderTopics)
            this.metadata.add(topic, now);

        log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
            result.unknownLeaderTopics);
        this.metadata.requestUpdate();
    }

    // remove any nodes we aren't ready to send to
    // 删除掉没有准备好发送的数据
    Iterator<Node> iter = result.readyNodes.iterator();
    long notReadyTimeout = Long.MAX_VALUE;
    while (iter.hasNext()) {
        Node node = iter.next();
        if (!this.client.ready(node, now)) {
            // Update just the readyTimeMs of the latency stats, so that it moves forward
            // every time the batch is ready (then the difference between readyTimeMs and
            // drainTimeMs would represent how long data is waiting for the node).
            this.accumulator.updateNodeLatencyStats(node.id(), now, false);
            iter.remove();
            notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
        } else {
            // Update both readyTimeMs and drainTimeMs, this would "reset" the node
            // latency.
            this.accumulator.updateNodeLatencyStats(node.id(), now, true);
        }
    }

    // create produce requests
    // 2、发往同一个 broker 节点的数据,打包为一个请求批次
    Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(metadata, result.readyNodes, this.maxRequestSize, now);
    ......
    // 3、发送请求
    sendProduceRequests(batches, now);
    return pollTimeout;
}


// 3、发送请求
sendProduceRequests(batches, now);

private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
    ......
    String nodeId = Integer.toString(destination);
    // 创建发送请求对象
    ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
            requestTimeoutMs, callback);
    // 发送请求
    client.send(clientRequest, now);
    log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
}

// 选中 send,点击 Ctrl + alt + b
@Override
public void send(ClientRequest request, long now) {
    doSend(request, false, now);
}

private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {
    ......
    try {
        ......
        // The call to build may also throw UnsupportedVersionException, if there are essential
        // fields that cannot be represented in the chosen version.
        doSend(clientRequest, isInternalRequest, now, builder.build(version));
    } catch (UnsupportedVersionException unsupportedVersionException) {
        ......
    }
}

private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
    String destination = clientRequest.destination();
    RequestHeader header = clientRequest.makeHeader(request.version());
    if (log.isDebugEnabled()) {
        log.debug("Sending {} request with header {} and timeout {} to node {}: {}",
            clientRequest.apiKey(), header, clientRequest.requestTimeoutMs(), destination, request);
    }
    Send send = request.toSend(header);
    InFlightRequest inFlightRequest = new InFlightRequest(
            clientRequest,
            header,
            isInternalRequest,
            request,
            send,
            now);
    // 添加请求到 inFlightRequests
    this.inFlightRequests.add(inFlightRequest);
    // 发送数据
    selector.send(new NetworkSend(clientRequest.destination(), send));
}

8. 拉取数据

开始拉取数据 KafkaConsumer.java

java
private Fetch<K, V> pollForFetches(Timer timer) {
    long pollTimeout = coordinator == null ? timer.remainingMs() :
            Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());

    // if data is available already, return it immediately
    final Fetch<K, V> fetch = fetcher.collectFetch();
    if (!fetch.isEmpty()) {
        return fetch;
    }

    // send any new fetches (won't resend pending fetches)
    // 发送请求并抓取数据
    sendFetches();
    ......

    // 把数据按照分区封装好后,一次处理默认 500 条数据
    return fetcher.collectFetch();
}

发送请求并抓取数据 Fetcher.java

java
 private int sendFetches() {
    offsetFetcher.validatePositionsOnMetadataChange();
    return fetcher.sendFetches();
}

public synchronized int sendFetches() {
    Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = prepareFetchRequests();

    for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequestMap.entrySet()) {
        final Node fetchTarget = entry.getKey();
        final FetchSessionHandler.FetchRequestData data = entry.getValue();
        // 初始化抓取数据的参数:
        // 最大等待时间默认 500ms
        // 最小抓取一个字节
        // 最大抓取 50m 数据,
        final FetchRequest.Builder request = createFetchRequest(fetchTarget, data);
        RequestFutureListener<ClientResponse> listener = new RequestFutureListener<ClientResponse>() {
            // 成功接收服务器端数据
            @Override
            public void onSuccess(ClientResponse resp) {
                synchronized (Fetcher.this) {
                    // 获取服务器端响应数据
                    handleFetchResponse(fetchTarget, data, resp);
                }
            }

            @Override
            public void onFailure(RuntimeException e) {
                synchronized (Fetcher.this) {
                    handleFetchResponse(fetchTarget, e);
                }
            }
        };
        // 发送拉取数据请求
        final RequestFuture<ClientResponse> future = client.send(fetchTarget, request);
        // 监听服务器端返回的数据
        future.addListener(listener);
    }

    return fetchRequestMap.size();
}


protected void handleFetchResponse(final Node fetchTarget,
                                       final FetchSessionHandler.FetchRequestData data,
                                       final ClientResponse resp) {
    try {
        ......
        for (Map.Entry<TopicPartition, FetchResponseData.PartitionData> entry : responseData.entrySet()) {
            ......
            ConcurrentLinkedQueue<CompletedFetch> completedFetches;
            // 把数据按照分区,添加到消息队列里面
            completedFetches.add(completedFetch);
        }

        metricsManager.recordLatency(resp.requestLatencyMs());
    } finally {
        log.debug("Removing pending request for node {}", fetchTarget);
        nodesWithPendingFetchRequests.remove(fetchTarget.id());
    }
}

AbstractFetch.java

java
public Fetch<K, V> collectFetch() {
    Fetch<K, V> fetch = Fetch.empty();
    Queue<CompletedFetch<K, V>> pausedCompletedFetches = new ArrayDeque<>();
    // 一次处理的最大条数,默认 500 条
    int recordsRemaining = fetchConfig.maxPollRecords;

    try {
        // 循环处理
        while (recordsRemaining > 0) {
            if (nextInLineFetch == null || nextInLineFetch.isConsumed) {
                // 从缓存中获取数据
                CompletedFetch<K, V> records = completedFetches.peek();
                // 缓存中数据为 null,直接跳出循环
                if (records == null) break;

                if (!records.initialized) {
                    try {
                        nextInLineFetch = initializeCompletedFetch(records);
                    } catch (Exception e) {
                        // Remove a completedFetch upon a parse with exception if (1) it contains no records, and
                        // (2) there are no fetched records with actual content preceding this exception.
                        // The first condition ensures that the completedFetches is not stuck with the same completedFetch
                        // in cases such as the TopicAuthorizationException, and the second condition ensures that no
                        // potential data loss due to an exception in a following record.
                        if (fetch.isEmpty() && FetchResponse.recordsOrFail(records.partitionData).sizeInBytes() == 0) {
                            completedFetches.poll();
                        }
                        throw e;
                    }
                } else {
                    nextInLineFetch = records;
                }
                // 从缓存中拉取数据
                completedFetches.poll();
            } else if (subscriptions.isPaused(nextInLineFetch.partition)) {
                // when the partition is paused we add the records back to the completedFetches queue instead of draining
                // them so that they can be returned on a subsequent poll if the partition is resumed at that time
                log.debug("Skipping fetching records for assigned partition {} because it is paused", nextInLineFetch.partition);
                pausedCompletedFetches.add(nextInLineFetch);
                nextInLineFetch = null;
            } else {
                Fetch<K, V> nextFetch = fetchRecords(recordsRemaining);
                recordsRemaining -= nextFetch.numRecords();
                fetch.add(nextFetch);
            }
        }
    } catch (KafkaException e) {
        if (fetch.isEmpty())
            throw e;
    } finally {
        // add any polled completed fetches for paused partitions back to the completed fetches queue to be
        // re-evaluated in the next poll
        completedFetches.addAll(pausedCompletedFetches);
    }

    return fetch;
}

6. 拦截器处理数据

在 poll()方法中点击 onConsume()方法。