Kafka消费者源码
1. 消费者初始化
2. 消费者初始化源码
点击 main()方法中的 KafkaConsumer ()。 KafkaConsumer.java
java
public KafkaConsumer(Map<String, Object> configs) {
this(configs, null, null);
}
public KafkaConsumer(Properties properties) {
this(properties, null, null);
}
public KafkaConsumer(Properties properties,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
this(Utils.propsToMap(properties), keyDeserializer, valueDeserializer);
}
跳转到 KafkaConsumer 构造方法。
java
KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
try {
GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(config,
GroupRebalanceConfig.ProtocolType.CONSUMER);
// 获取消费者组 id 和客户端 id
this.groupId = Optional.ofNullable(groupRebalanceConfig.groupId);
this.clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
LogContext logContext = createLogContext(config, groupRebalanceConfig);
this.log = logContext.logger(getClass());
boolean enableAutoCommit = config.maybeOverrideEnableAutoCommit();
groupId.ifPresent(groupIdStr -> {
if (groupIdStr.isEmpty()) {
log.warn("Support for using the empty group id by consumers is deprecated and will be removed in the next major release.");
}
});
log.debug("Initializing the Kafka consumer");
// 等待服务端响应的最大等待时间,默认是 30s
this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
this.time = Time.SYSTEM;
this.metrics = createMetrics(config, time);
// 重试时间间隔
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
// 拦截器配置
List<ConsumerInterceptor<K, V>> interceptorList = createConsumerInterceptors(config);
this.interceptors = new ConsumerInterceptors<>(interceptorList);
// key 和 value 反序列化配置
this.keyDeserializer = createKeyDeserializer(config, keyDeserializer);
this.valueDeserializer = createValueDeserializer(config, valueDeserializer);
// offset 从什么位置开始消费,默认是 latest
this.subscriptions = createSubscriptionState(config, logContext);
ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(this.keyDeserializer,
this.valueDeserializer, metrics.reporters(), interceptorList);
// 获取元数据
// 配置是否可以消费系统主题数据
// 配置是否允许自动创建主题
this.metadata = new ConsumerMetadata(config, subscriptions, logContext, clusterResourceListeners);
// 配置连接 Kafka 集群
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config);
this.metadata.bootstrap(addresses);
FetchMetricsManager fetchMetricsManager = createFetchMetricsManager(metrics);
this.isolationLevel = createIsolationLevel(config);
ApiVersions apiVersions = new ApiVersions();
// 创建一个消费者客户端
this.client = createConsumerNetworkClient(config,
metrics,
logContext,
apiVersions,
time,
metadata,
fetchMetricsManager.throttleTimeSensor(),
retryBackoffMs);
// 获取消费者分区分配策略
this.assignors = ConsumerPartitionAssignor.getAssignorInstances(
config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId))
);
// no coordinator will be constructed for the default (null) group id
if (!groupId.isPresent()) {
config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
config.ignore(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
this.coordinator = null;
} else {
// 创建消费者协调器
// 自动提交 Offset 时间间隔,默认 5s
this.coordinator = new ConsumerCoordinator(groupRebalanceConfig,
logContext,
this.client,
assignors,
this.metadata,
this.subscriptions,
metrics,
CONSUMER_METRIC_GROUP_PREFIX,
this.time,
enableAutoCommit,
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
this.interceptors,
config.getBoolean(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED),
config.getString(ConsumerConfig.CLIENT_RACK_CONFIG));
}
// 抓取数据配置
// 一次抓取最小值,默认 1 个字节
// 一次抓取最大值,默认 50m
// 一次抓取最大等待时间,默认 500ms
// 每个分区抓取的最大字节数,默认 1m
// 一次 poll 拉取数据返回消息的最大条数,默认是 500 条。
// key 和 value 的反序列化
FetchConfig<K, V> fetchConfig = createFetchConfig(config, this.keyDeserializer, this.valueDeserializer);
this.fetcher = new Fetcher<>(
logContext,
this.client,
this.metadata,
this.subscriptions,
fetchConfig,
fetchMetricsManager,
this.time);
this.offsetFetcher = new OffsetFetcher(logContext,
client,
metadata,
subscriptions,
time,
retryBackoffMs,
requestTimeoutMs,
isolationLevel,
apiVersions);
this.topicMetadataFetcher = new TopicMetadataFetcher(logContext, client, retryBackoffMs);
this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, CONSUMER_METRIC_GROUP_PREFIX);
config.logUnused();
AppInfoParser.registerAppInfo(CONSUMER_JMX_PREFIX, clientId, metrics, time.milliseconds());
log.debug("Kafka consumer initialized");
} catch (Throwable t) {
// call close methods if internal objects are already constructed; this is to prevent resource leak. see KAFKA-2121
// we do not need to call `close` at all when `log` is null, which means no internal objects were initialized.
if (this.log != null) {
close(Duration.ZERO, true);
}
// now propagate the exception
throw new KafkaException("Failed to construct kafka consumer", t);
}
}
3. 消费者订阅主题
点击自己编写的 CustomConsumer.java 中的 subscribe ()方法。 KafkaConsumer.java
java
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
acquireAndEnsureOpen();
try {
maybeThrowInvalidGroupIdException();
// 异常情况处理
if (topics == null)
throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
if (topics.isEmpty()) {
// treat subscribing to empty topic list as the same as unsubscribing
this.unsubscribe();
} else {
for (String topic : topics) {
if (Utils.isBlank(topic))
throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
}
throwIfNoAssignorsConfigured();
// 清空订阅异常主题的缓存数据
fetcher.clearBufferedDataForUnassignedTopics(topics);
log.info("Subscribed to topic(s): {}", Utils.join(topics, ", "));
// 判断是否需要更改订阅主题,如果需要更改主题,则更新元数据信息
if (this.subscriptions.subscribe(new HashSet<>(topics), listener))
metadata.requestUpdateForNewTopics();
}
} finally {
release();
}
}
// 判断是否需要更改订阅主题,如果需要更改主题,则更新元数据信息
if (this.subscriptions.subscribe(new HashSet<>(topics), listener))
public synchronized boolean subscribe(Set<String> topics, ConsumerRebalanceListener listener) {
// 注册负载均衡监听(例如消费者组中,其他消费者退出触发再平衡)
registerRebalanceListener(listener);
// 按照设置的主题开始订阅,自动分配分区
setSubscriptionType(SubscriptionType.AUTO_TOPICS);
// 修改订阅主题信息
return changeSubscription(topics);
}
private boolean changeSubscription(Set<String> topicsToSubscribe) {
// 如果订阅的主题和以前订阅的一致,就不需要修改订阅信息。如果不一致,就需要修改。
if (subscription.equals(topicsToSubscribe))
return false;
subscription = topicsToSubscribe;
return true;
}
// 如果订阅的和以前不一致,需要更新元数据信息
public synchronized int requestUpdateForNewTopics() {
// Override the timestamp of last refresh to let immediate update.
this.lastRefreshMs = 0;
this.needPartialUpdate = true;
this.requestVersion++;
return this.updateVersion;
}
4. 消费者拉取和处理数据
消费总体流程
KafkaConsumer.java
java
private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
acquireAndEnsureOpen();
try {
// 记录开始拉取消息时间
this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());
if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
}
do {
client.maybeTriggerWakeup();
if (includeMetadataInTimeout) {
// 1、消费者 or 消费者组初始化
// try to update assignment metadata BUT do not need to block on the timer for join group
updateAssignmentMetadataIfNeeded(timer, false);
} else {
while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), true)) {
log.warn("Still waiting for metadata");
}
}
// 2、开始拉取数据
final Fetch<K, V> fetch = pollForFetches(timer);
if (!fetch.isEmpty()) {
// before returning the fetched records, we can send off the next round of fetches
// and avoid block waiting for their responses to enable pipelining while the user
// is handling the fetched records.
//
// NOTE: since the consumed position has already been updated, we must not allow
// wakeups or any other errors to be triggered prior to returning the fetched records.
if (sendFetches() > 0 || client.hasPendingRequests()) {
client.transmitSends();
}
if (fetch.records().isEmpty()) {
log.trace("Returning empty records from `poll()` "
+ "since the consumer's position has advanced for at least one topic partition");
}
// 3、拦截器处理消息
return this.interceptors.onConsume(new ConsumerRecords<>(fetch.records()));
}
} while (timer.notExpired());
return ConsumerRecords.empty();
} finally {
release();
this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
}
}
5. 消费者/消费者组初始化
KafkaConsumer.java
java
boolean updateAssignmentMetadataIfNeeded(final Timer timer, final boolean waitForJoinGroup) {
if (coordinator != null && !coordinator.poll(timer, waitForJoinGroup)) {
return false;
}
return updateFetchPositions(timer);
}
public boolean poll(Timer timer, boolean waitForJoinGroup) {
// 获取最新元数据
maybeUpdateSubscriptionMetadata();
invokeCompletedOffsetCommitCallbacks();
if (subscriptions.hasAutoAssignedPartitions()) {
if (protocol == null) {
throw new IllegalStateException("User configured " + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG +
" to empty while trying to subscribe for group protocol to auto assign partitions");
}
// Always update the heartbeat last poll time so that the heartbeat thread does not leave the
// group proactively due to application inactivity even if (say) the coordinator cannot be found.
// 3s 发送一次心跳
pollHeartbeat(timer.currentTimeMs());
// 保证和 Coordinator 正常通信(寻找服务器端的 coordinator)
if (coordinatorUnknownAndUnreadySync(timer)) {
return false;
}
// 判断是否需要加入消费者组
if (rejoinNeededOrPending()) {
// due to a race condition between the initial metadata fetch and the initial rebalance,
// we need to ensure that the metadata is fresh before joining initially. This ensures
// that we have matched the pattern against the cluster's topics at least once before joining.
if (subscriptions.hasPatternSubscription()) {
// For consumer group that uses pattern-based subscription, after a topic is created,
// any consumer that discovers the topic after metadata refresh can trigger rebalance
// across the entire consumer group. Multiple rebalances can be triggered after one topic
// creation if consumers refresh metadata at vastly different times. We can significantly
// reduce the number of rebalances caused by single topic creation by asking consumer to
// refresh metadata before re-joining the group as long as the refresh backoff time has
// passed.
if (this.metadata.timeToAllowUpdate(timer.currentTimeMs()) == 0) {
this.metadata.requestUpdate();
}
if (!client.ensureFreshMetadata(timer)) {
return false;
}
maybeUpdateSubscriptionMetadata();
}
// if not wait for join group, we would just use a timer of 0
if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L))) {
// since we may use a different timer in the callee, we'd still need
// to update the original timer's current time after the call
timer.update(time.milliseconds());
return false;
}
}
} else {
// For manually assigned partitions, we do not try to pro-actively lookup coordinator;
// instead we only try to refresh metadata when necessary.
// If connections to all nodes fail, wakeups triggered while attempting to send fetch
// requests result in polls returning immediately, causing a tight loop of polls. Without
// the wakeup, poll() with no channels would block for the timeout, delaying re-connection.
// awaitMetadataUpdate() in ensureCoordinatorReady initiates new connections with configured backoff and avoids the busy loop.
if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) {
client.awaitMetadataUpdate(timer);
}
// if there is pending coordinator requests, ensure they have a chance to be transmitted.
client.pollNoWakeup();
}
// 是否自动提交 offset
maybeAutoCommitOffsetsAsync(timer.currentTimeMs());
return true;
}
private boolean coordinatorUnknownAndUnreadySync(Timer timer) {
return coordinatorUnknown() && !ensureCoordinatorReady(timer);
}
protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
return ensureCoordinatorReady(timer, false);
}
private synchronized boolean ensureCoordinatorReady(final Timer timer, boolean disableWakeup) {
// 如果找到 coordinator,直接返回
if (!coordinatorUnknown())
return true;
// 如果没有找到,循环给服务器端发送请求,直到找到 coordinator
do {
if (fatalFindCoordinatorException != null) {
final RuntimeException fatalException = fatalFindCoordinatorException;
fatalFindCoordinatorException = null;
throw fatalException;
}
// 创建寻找 coordinator 的请求
final RequestFuture<Void> future = lookupCoordinator();
// 发送寻找 coordinator 的请求给服务器端
client.poll(future, timer, disableWakeup);
if (!future.isDone()) {
// ran out of time
break;
}
......
} while (coordinatorUnknown() && timer.notExpired());
return !coordinatorUnknown();
}
protected synchronized RequestFuture<Void> lookupCoordinator() {
if (findCoordinatorFuture == null) {
// find a node to ask about the coordinator
Node node = this.client.leastLoadedNode();
if (node == null) {
log.debug("No broker available to send FindCoordinator request");
return RequestFuture.noBrokersAvailable();
} else {
// 向服务器端发送,查找 Coordinator 请求
findCoordinatorFuture = sendFindCoordinatorRequest(node);
}
}
return findCoordinatorFuture;
}
private RequestFuture<Void> sendFindCoordinatorRequest(Node node) {
log.debug("Sending FindCoordinator request to broker {}", node);
FindCoordinatorRequestData data = new FindCoordinatorRequestData()
.setKeyType(CoordinatorType.GROUP.id())
.setKey(this.rebalanceConfig.groupId);
// 封装发送请求
FindCoordinatorRequest.Builder requestBuilder = new FindCoordinatorRequest.Builder(data);
// 消费者向服务器端发送请求
return client.send(node, requestBuilder)
.compose(new FindCoordinatorResponseHandler());
}
- 拦截器处理数据
在 poll()方法中点击 onConsume()方法。
ConsumerInterceptors.java
java
// 从集合中拉取数据处理,首先经过的是拦截器
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
ConsumerRecords<K, V> interceptRecords = records;
for (ConsumerInterceptor<K, V> interceptor : this.interceptors) {
try {
interceptRecords = interceptor.onConsume(interceptRecords);
} catch (Exception e) {
// do not propagate interceptor exception, log and continue calling other interceptors
log.warn("Error executing interceptor onConsume callback", e);
}
}
return interceptRecords;
}
6. 消费者Offset提交
- 手动同步提交 Offset KafkaConsumer.java
java
@Override
public void commitSync() {
commitSync(Duration.ofMillis(defaultApiTimeoutMs));
}
@Override
public void commitSync(Duration timeout) {
commitSync(subscriptions.allConsumed(), timeout);
}
@Override
public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout) {
acquireAndEnsureOpen();
long commitStart = time.nanoseconds();
try {
maybeThrowInvalidGroupIdException();
offsets.forEach(this::updateLastSeenEpochIfNewer);
// 同步提交
if (!coordinator.commitOffsetsSync(new HashMap<>(offsets), time.timer(timeout))) {
throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before successfully " +
"committing offsets " + offsets);
}
} finally {
kafkaConsumerMetrics.recordCommitSync(time.nanoseconds() - commitStart);
release();
}
}
7. 手动异步提交 Offset
java
@Override
public void commitAsync() {
commitAsync(null);
}
@Override
public void commitAsync(OffsetCommitCallback callback) {
commitAsync(subscriptions.allConsumed(), callback);
}
@Override
public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
acquireAndEnsureOpen();
try {
maybeThrowInvalidGroupIdException();
log.debug("Committing offsets: {}", offsets);
offsets.forEach(this::updateLastSeenEpochIfNewer);
// 提交 offset
coordinator.commitOffsetsAsync(new HashMap<>(offsets), callback);
} finally {
release();
}
}
public RequestFuture<Void> commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
invokeCompletedOffsetCommitCallbacks();
RequestFuture<Void> future = null;
if (offsets.isEmpty()) {
// No need to check coordinator if offsets is empty since commit of empty offsets is completed locally.
future = doCommitOffsetsAsync(offsets, callback);
} else if (!coordinatorUnknownAndUnreadyAsync()) {
// we need to make sure coordinator is ready before committing, since
// this is for async committing we do not try to block, but just try once to
// clear the previous discover-coordinator future, resend, or get responses;
// if the coordinator is not ready yet then we would just proceed and put that into the
// pending requests, and future poll calls would still try to complete them.
//
// the key here though is that we have to try sending the discover-coordinator if
// it's not known or ready, since this is the only place we can send such request
// under manual assignment (there we would not have heartbeat thread trying to auto-rediscover
// the coordinator).
future = doCommitOffsetsAsync(offsets, callback);
} else {
// we don't know the current coordinator, so try to find it and then send the commit
// or fail (we don't want recursive retries which can cause offset commits to arrive
// out of order). Note that there may be multiple offset commits chained to the same
// coordinator lookup request. This is fine because the listeners will be invoked in
// the same order that they were added. Note also that AbstractCoordinator prevents
// multiple concurrent coordinator lookup requests.
pendingAsyncCommits.incrementAndGet();
// 监听提交 offset 的结果
lookupCoordinator().addListener(new RequestFutureListener<Void>() {
@Override
public void onSuccess(Void value) {
pendingAsyncCommits.decrementAndGet();
doCommitOffsetsAsync(offsets, callback);
client.pollNoWakeup();
}
@Override
public void onFailure(RuntimeException e) {
pendingAsyncCommits.decrementAndGet();
completedOffsetCommits.add(new OffsetCommitCompletion(callback, offsets,
new RetriableCommitFailedException(e)));
}
});
}
// ensure the commit has a chance to be transmitted (without blocking on its completion).
// Note that commits are treated as heartbeats by the coordinator, so there is no need to
// explicitly allow heartbeats through delayed task execution.
client.pollNoWakeup();
return future;
}