Skip to content

Kafka消费者源码

1. 消费者初始化

Alt text

2. 消费者初始化源码

点击 main()方法中的 KafkaConsumer ()。 KafkaConsumer.java

java
public KafkaConsumer(Map<String, Object> configs) {
        this(configs, null, null);
    }

public KafkaConsumer(Properties properties) {
    this(properties, null, null);
}

public KafkaConsumer(Properties properties,
                        Deserializer<K> keyDeserializer,
                        Deserializer<V> valueDeserializer) {
    this(Utils.propsToMap(properties), keyDeserializer, valueDeserializer);
}

跳转到 KafkaConsumer 构造方法。

java
KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
    try {
        GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(config,
                GroupRebalanceConfig.ProtocolType.CONSUMER);
        // 获取消费者组 id 和客户端 id
        this.groupId = Optional.ofNullable(groupRebalanceConfig.groupId);
        this.clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
        LogContext logContext = createLogContext(config, groupRebalanceConfig);
        this.log = logContext.logger(getClass());
        boolean enableAutoCommit = config.maybeOverrideEnableAutoCommit();
        groupId.ifPresent(groupIdStr -> {
            if (groupIdStr.isEmpty()) {
                log.warn("Support for using the empty group id by consumers is deprecated and will be removed in the next major release.");
            }
        });

        log.debug("Initializing the Kafka consumer");
        // 等待服务端响应的最大等待时间,默认是 30s
        this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
        this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
        this.time = Time.SYSTEM;
        this.metrics = createMetrics(config, time);
        // 重试时间间隔
        this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);

        // 拦截器配置
        List<ConsumerInterceptor<K, V>> interceptorList = createConsumerInterceptors(config);
        this.interceptors = new ConsumerInterceptors<>(interceptorList);
        // key 和 value 反序列化配置
        this.keyDeserializer = createKeyDeserializer(config, keyDeserializer);
        this.valueDeserializer = createValueDeserializer(config, valueDeserializer);
        //  offset 从什么位置开始消费,默认是 latest
        this.subscriptions = createSubscriptionState(config, logContext);
        ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(this.keyDeserializer,
                this.valueDeserializer, metrics.reporters(), interceptorList);
        // 获取元数据
        // 配置是否可以消费系统主题数据
        // 配置是否允许自动创建主题
        this.metadata = new ConsumerMetadata(config, subscriptions, logContext, clusterResourceListeners);
        // 配置连接 Kafka 集群
        List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config);
        this.metadata.bootstrap(addresses);

        FetchMetricsManager fetchMetricsManager = createFetchMetricsManager(metrics);
        this.isolationLevel = createIsolationLevel(config);

        ApiVersions apiVersions = new ApiVersions();
        // 创建一个消费者客户端
        this.client = createConsumerNetworkClient(config,
                metrics,
                logContext,
                apiVersions,
                time,
                metadata,
                fetchMetricsManager.throttleTimeSensor(),
                retryBackoffMs);

        // 获取消费者分区分配策略
        this.assignors = ConsumerPartitionAssignor.getAssignorInstances(
                config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
                config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId))
        );

        // no coordinator will be constructed for the default (null) group id
        if (!groupId.isPresent()) {
            config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
            config.ignore(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
            this.coordinator = null;
        } else {
            // 创建消费者协调器
            // 自动提交 Offset 时间间隔,默认 5s
            this.coordinator = new ConsumerCoordinator(groupRebalanceConfig,
                    logContext,
                    this.client,
                    assignors,
                    this.metadata,
                    this.subscriptions,
                    metrics,
                    CONSUMER_METRIC_GROUP_PREFIX,
                    this.time,
                    enableAutoCommit,
                    config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
                    this.interceptors,
                    config.getBoolean(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED),
                    config.getString(ConsumerConfig.CLIENT_RACK_CONFIG));
        }
        // 抓取数据配置
        // 一次抓取最小值,默认 1 个字节
        // 一次抓取最大值,默认 50m
        // 一次抓取最大等待时间,默认 500ms
        // 每个分区抓取的最大字节数,默认 1m
        // 一次 poll 拉取数据返回消息的最大条数,默认是 500 条。
        // key 和 value 的反序列化
        FetchConfig<K, V> fetchConfig = createFetchConfig(config, this.keyDeserializer, this.valueDeserializer);
        this.fetcher = new Fetcher<>(
                logContext,
                this.client,
                this.metadata,
                this.subscriptions,
                fetchConfig,
                fetchMetricsManager,
                this.time);
        this.offsetFetcher = new OffsetFetcher(logContext,
                client,
                metadata,
                subscriptions,
                time,
                retryBackoffMs,
                requestTimeoutMs,
                isolationLevel,
                apiVersions);
        this.topicMetadataFetcher = new TopicMetadataFetcher(logContext, client, retryBackoffMs);

        this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, CONSUMER_METRIC_GROUP_PREFIX);

        config.logUnused();
        AppInfoParser.registerAppInfo(CONSUMER_JMX_PREFIX, clientId, metrics, time.milliseconds());
        log.debug("Kafka consumer initialized");
    } catch (Throwable t) {
        // call close methods if internal objects are already constructed; this is to prevent resource leak. see KAFKA-2121
        // we do not need to call `close` at all when `log` is null, which means no internal objects were initialized.
        if (this.log != null) {
            close(Duration.ZERO, true);
        }
        // now propagate the exception
        throw new KafkaException("Failed to construct kafka consumer", t);
    }
}

3. 消费者订阅主题

Alt text 点击自己编写的 CustomConsumer.java 中的 subscribe ()方法。 KafkaConsumer.java

java
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
    acquireAndEnsureOpen();
    try {
        maybeThrowInvalidGroupIdException();
        // 异常情况处理
        if (topics == null)
            throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
        if (topics.isEmpty()) {
            // treat subscribing to empty topic list as the same as unsubscribing
            this.unsubscribe();
        } else {
            for (String topic : topics) {
                if (Utils.isBlank(topic))
                    throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
            }

            throwIfNoAssignorsConfigured();
            // 清空订阅异常主题的缓存数据
            fetcher.clearBufferedDataForUnassignedTopics(topics);
            log.info("Subscribed to topic(s): {}", Utils.join(topics, ", "));
            // 判断是否需要更改订阅主题,如果需要更改主题,则更新元数据信息
            if (this.subscriptions.subscribe(new HashSet<>(topics), listener))
                metadata.requestUpdateForNewTopics();
        }
    } finally {
        release();
    }
}

// 判断是否需要更改订阅主题,如果需要更改主题,则更新元数据信息
if (this.subscriptions.subscribe(new HashSet<>(topics), listener))

public synchronized boolean subscribe(Set<String> topics, ConsumerRebalanceListener listener) {
    // 注册负载均衡监听(例如消费者组中,其他消费者退出触发再平衡)
    registerRebalanceListener(listener);
    // 按照设置的主题开始订阅,自动分配分区
    setSubscriptionType(SubscriptionType.AUTO_TOPICS);
    // 修改订阅主题信息
    return changeSubscription(topics);
}

private boolean changeSubscription(Set<String> topicsToSubscribe) {
    // 如果订阅的主题和以前订阅的一致,就不需要修改订阅信息。如果不一致,就需要修改。
    if (subscription.equals(topicsToSubscribe))
        return false;

    subscription = topicsToSubscribe;
    return true;
}

// 如果订阅的和以前不一致,需要更新元数据信息
public synchronized int requestUpdateForNewTopics() {
    // Override the timestamp of last refresh to let immediate update.
    this.lastRefreshMs = 0;
    this.needPartialUpdate = true;
    this.requestVersion++;
    return this.updateVersion;
}

4. 消费者拉取和处理数据

Alt text 消费总体流程
KafkaConsumer.java

java
private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
    acquireAndEnsureOpen();
    try {
        // 记录开始拉取消息时间
        this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());

        if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
            throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
        }

        do {
            client.maybeTriggerWakeup();

            if (includeMetadataInTimeout) {
                // 1、消费者 or 消费者组初始化
                // try to update assignment metadata BUT do not need to block on the timer for join group
                updateAssignmentMetadataIfNeeded(timer, false);
            } else {
                while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), true)) {
                    log.warn("Still waiting for metadata");
                }
            }
            // 2、开始拉取数据
            final Fetch<K, V> fetch = pollForFetches(timer);
            if (!fetch.isEmpty()) {
                // before returning the fetched records, we can send off the next round of fetches
                // and avoid block waiting for their responses to enable pipelining while the user
                // is handling the fetched records.
                //
                // NOTE: since the consumed position has already been updated, we must not allow
                // wakeups or any other errors to be triggered prior to returning the fetched records.
                if (sendFetches() > 0 || client.hasPendingRequests()) {
                    client.transmitSends();
                }

                if (fetch.records().isEmpty()) {
                    log.trace("Returning empty records from `poll()` "
                            + "since the consumer's position has advanced for at least one topic partition");
                }
                // 3、拦截器处理消息
                return this.interceptors.onConsume(new ConsumerRecords<>(fetch.records()));
            }
        } while (timer.notExpired());

        return ConsumerRecords.empty();
    } finally {
        release();
        this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
    }
}

5. 消费者/消费者组初始化

KafkaConsumer.java

java
boolean updateAssignmentMetadataIfNeeded(final Timer timer, final boolean waitForJoinGroup) {
    if (coordinator != null && !coordinator.poll(timer, waitForJoinGroup)) {
        return false;
    }

    return updateFetchPositions(timer);
}


public boolean poll(Timer timer, boolean waitForJoinGroup) {
    // 获取最新元数据
    maybeUpdateSubscriptionMetadata();

    invokeCompletedOffsetCommitCallbacks();

    if (subscriptions.hasAutoAssignedPartitions()) {
        if (protocol == null) {
            throw new IllegalStateException("User configured " + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG +
                " to empty while trying to subscribe for group protocol to auto assign partitions");
        }
        // Always update the heartbeat last poll time so that the heartbeat thread does not leave the
        // group proactively due to application inactivity even if (say) the coordinator cannot be found.
        // 3s 发送一次心跳
        pollHeartbeat(timer.currentTimeMs());
        // 保证和 Coordinator 正常通信(寻找服务器端的 coordinator)
        if (coordinatorUnknownAndUnreadySync(timer)) {
            return false;
        }
        // 判断是否需要加入消费者组
        if (rejoinNeededOrPending()) {
            // due to a race condition between the initial metadata fetch and the initial rebalance,
            // we need to ensure that the metadata is fresh before joining initially. This ensures
            // that we have matched the pattern against the cluster's topics at least once before joining.
            if (subscriptions.hasPatternSubscription()) {
                // For consumer group that uses pattern-based subscription, after a topic is created,
                // any consumer that discovers the topic after metadata refresh can trigger rebalance
                // across the entire consumer group. Multiple rebalances can be triggered after one topic
                // creation if consumers refresh metadata at vastly different times. We can significantly
                // reduce the number of rebalances caused by single topic creation by asking consumer to
                // refresh metadata before re-joining the group as long as the refresh backoff time has
                // passed.
                if (this.metadata.timeToAllowUpdate(timer.currentTimeMs()) == 0) {
                    this.metadata.requestUpdate();
                }

                if (!client.ensureFreshMetadata(timer)) {
                    return false;
                }

                maybeUpdateSubscriptionMetadata();
            }

            // if not wait for join group, we would just use a timer of 0
            if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L))) {
                // since we may use a different timer in the callee, we'd still need
                // to update the original timer's current time after the call
                timer.update(time.milliseconds());

                return false;
            }
        }
    } else {
        // For manually assigned partitions, we do not try to pro-actively lookup coordinator;
        // instead we only try to refresh metadata when necessary.
        // If connections to all nodes fail, wakeups triggered while attempting to send fetch
        // requests result in polls returning immediately, causing a tight loop of polls. Without
        // the wakeup, poll() with no channels would block for the timeout, delaying re-connection.
        // awaitMetadataUpdate() in ensureCoordinatorReady initiates new connections with configured backoff and avoids the busy loop.
        if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) {
            client.awaitMetadataUpdate(timer);
        }

        // if there is pending coordinator requests, ensure they have a chance to be transmitted.
        client.pollNoWakeup();
    }
    // 是否自动提交 offset
    maybeAutoCommitOffsetsAsync(timer.currentTimeMs());
    return true;
}

private boolean coordinatorUnknownAndUnreadySync(Timer timer) {
    return coordinatorUnknown() && !ensureCoordinatorReady(timer);
}
protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
    return ensureCoordinatorReady(timer, false);
}

private synchronized boolean ensureCoordinatorReady(final Timer timer, boolean disableWakeup) {
    // 如果找到 coordinator,直接返回
    if (!coordinatorUnknown())
        return true;
    // 如果没有找到,循环给服务器端发送请求,直到找到 coordinator
    do {
        if (fatalFindCoordinatorException != null) {
            final RuntimeException fatalException = fatalFindCoordinatorException;
            fatalFindCoordinatorException = null;
            throw fatalException;
        }
        // 创建寻找 coordinator 的请求
        final RequestFuture<Void> future = lookupCoordinator();
        // 发送寻找 coordinator 的请求给服务器端
        client.poll(future, timer, disableWakeup);

        if (!future.isDone()) {
            // ran out of time
            break;
        }
        ......
    } while (coordinatorUnknown() && timer.notExpired());

    return !coordinatorUnknown();
}


protected synchronized RequestFuture<Void> lookupCoordinator() {
    if (findCoordinatorFuture == null) {
        // find a node to ask about the coordinator
        Node node = this.client.leastLoadedNode();
        if (node == null) {
            log.debug("No broker available to send FindCoordinator request");
            return RequestFuture.noBrokersAvailable();
        } else {
            // 向服务器端发送,查找 Coordinator 请求
            findCoordinatorFuture = sendFindCoordinatorRequest(node);
        }
    }
    return findCoordinatorFuture;
}

private RequestFuture<Void> sendFindCoordinatorRequest(Node node) {
    log.debug("Sending FindCoordinator request to broker {}", node);
    FindCoordinatorRequestData data = new FindCoordinatorRequestData()
            .setKeyType(CoordinatorType.GROUP.id())
            .setKey(this.rebalanceConfig.groupId);
    // 封装发送请求
    FindCoordinatorRequest.Builder requestBuilder = new FindCoordinatorRequest.Builder(data);
    // 消费者向服务器端发送请求
    return client.send(node, requestBuilder)
            .compose(new FindCoordinatorResponseHandler());
}
  1. 拦截器处理数据
    在 poll()方法中点击 onConsume()方法。
    ConsumerInterceptors.java
java
// 从集合中拉取数据处理,首先经过的是拦截器
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
    ConsumerRecords<K, V> interceptRecords = records;
    for (ConsumerInterceptor<K, V> interceptor : this.interceptors) {
        try {
            interceptRecords = interceptor.onConsume(interceptRecords);
        } catch (Exception e) {
            // do not propagate interceptor exception, log and continue calling other interceptors
            log.warn("Error executing interceptor onConsume callback", e);
        }
    }
    return interceptRecords;
}

6. 消费者Offset提交

  1. 手动同步提交 Offset KafkaConsumer.java
java
@Override
public void commitSync() {
    commitSync(Duration.ofMillis(defaultApiTimeoutMs));
}


@Override
public void commitSync(Duration timeout) {
    commitSync(subscriptions.allConsumed(), timeout);
}


@Override
public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout) {
    acquireAndEnsureOpen();
    long commitStart = time.nanoseconds();
    try {
        maybeThrowInvalidGroupIdException();
        offsets.forEach(this::updateLastSeenEpochIfNewer);
        // 同步提交
        if (!coordinator.commitOffsetsSync(new HashMap<>(offsets), time.timer(timeout))) {
            throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before successfully " +
                    "committing offsets " + offsets);
        }
    } finally {
        kafkaConsumerMetrics.recordCommitSync(time.nanoseconds() - commitStart);
        release();
    }
}

7. 手动异步提交 Offset

java
@Override
public void commitAsync() {
    commitAsync(null);
}

@Override
public void commitAsync(OffsetCommitCallback callback) {
    commitAsync(subscriptions.allConsumed(), callback);
}

@Override
public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
    acquireAndEnsureOpen();
    try {
        maybeThrowInvalidGroupIdException();
        log.debug("Committing offsets: {}", offsets);
        offsets.forEach(this::updateLastSeenEpochIfNewer);
        // 提交 offset
        coordinator.commitOffsetsAsync(new HashMap<>(offsets), callback);
    } finally {
        release();
    }
}


public RequestFuture<Void> commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
    invokeCompletedOffsetCommitCallbacks();

    RequestFuture<Void> future = null;
    if (offsets.isEmpty()) {
        // No need to check coordinator if offsets is empty since commit of empty offsets is completed locally.
        future = doCommitOffsetsAsync(offsets, callback);
    } else if (!coordinatorUnknownAndUnreadyAsync()) {
        // we need to make sure coordinator is ready before committing, since
        // this is for async committing we do not try to block, but just try once to
        // clear the previous discover-coordinator future, resend, or get responses;
        // if the coordinator is not ready yet then we would just proceed and put that into the
        // pending requests, and future poll calls would still try to complete them.
        //
        // the key here though is that we have to try sending the discover-coordinator if
        // it's not known or ready, since this is the only place we can send such request
        // under manual assignment (there we would not have heartbeat thread trying to auto-rediscover
        // the coordinator).
        future = doCommitOffsetsAsync(offsets, callback);
    } else {
        // we don't know the current coordinator, so try to find it and then send the commit
        // or fail (we don't want recursive retries which can cause offset commits to arrive
        // out of order). Note that there may be multiple offset commits chained to the same
        // coordinator lookup request. This is fine because the listeners will be invoked in
        // the same order that they were added. Note also that AbstractCoordinator prevents
        // multiple concurrent coordinator lookup requests.
        pendingAsyncCommits.incrementAndGet();
        // 监听提交 offset 的结果
        lookupCoordinator().addListener(new RequestFutureListener<Void>() {
            @Override
            public void onSuccess(Void value) {
                pendingAsyncCommits.decrementAndGet();
                doCommitOffsetsAsync(offsets, callback);
                client.pollNoWakeup();
            }

            @Override
            public void onFailure(RuntimeException e) {
                pendingAsyncCommits.decrementAndGet();
                completedOffsetCommits.add(new OffsetCommitCompletion(callback, offsets,
                        new RetriableCommitFailedException(e)));
            }
        });
    }

    // ensure the commit has a chance to be transmitted (without blocking on its completion).
    // Note that commits are treated as heartbeats by the coordinator, so there is no need to
    // explicitly allow heartbeats through delayed task execution.
    client.pollNoWakeup();
    return future;
}