A Look Under the Hood of Kafka Consumer
Exploring the code behind the Kafka Consumer client.
Intro
In my last post I explored the Kafka Producer internals and it was a very fun and informative exercise. That effort naturally led me to take look into the Kafka Consumer codebase, only the classic one, which manages consumer groups on the consumer side. There is a new group.protocol
introduced in KIP-848 which moves the burden of group management from a the leader consumer (the first one to join the consumer group) to the broker that coordinates that group AKA the coordinator.
KafkaConsumer Class
As we did in the previous post, let’s look at a basic example for a plain Java Kafka Consumer:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// Create Kafka consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// Subscribe to the topic(s)
consumer.subscribe(Collections.singletonList("test-topic"));
// Poll and consume messages
try {
while (true) {
var records = consumer.poll(1000); // Poll for new messages with timeout of 1 second
// Process the messages
records.forEach(record -> {
System.out.printf("Consumed record with key: %s, value: %s%n", record.key(), record.value());
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// Always close the consumer
consumer.close();
}
Nothing too complicated. We need a broker to read data from. Specify a consumer group and a topic we’d like to consume. Call the magical poll
method (the workhorse of this operation) and loop over those records and do some processing. In our case, we’re doing a fancy printf
, the peek of sophistication. 😉
So the public API to the consumer is KafkaConsumer
.
If we strip the Javadoc comments, it’s rather a short class. It’s actually just a facade to different implementations of the consumer either the classical one or the new kip-848 one.
Let’s follow our constructor path:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// we call this first
public KafkaConsumer(Properties properties) {
this(properties, null, null);
}
// which then calls:
public KafkaConsumer(Properties properties,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
this(propsToMap(properties), keyDeserializer, valueDeserializer);
}
// which calls:
public KafkaConsumer(Map<String, Object> configs,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
this(new ConsumerConfig(ConsumerConfig.appendDeserializerToConfig(configs, keyDeserializer, valueDeserializer)),
keyDeserializer, valueDeserializer);
}
// and finally:
KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
delegate = CREATOR.create(config, keyDeserializer, valueDeserializer);
}
Aha! We pass the ball to this delegate
, which will be called in all other important methods. For instance, the famous poll
method in KafkaConsumer
is nothing but:
1
2
3
public ConsumerRecords<K, V> poll(final Duration timeout) {
return delegate.poll(timeout);
}
So this delegate
is what we are actually interested in. delegate
is of type ConsumerDelegate<K, V>
which is an interface that has all the KafkaConsumer
methods.
CREATOR
is an instance of ConsumerDelegateCreator
which is basically a factor and its create
method used to instantiate the delegate
is short and sweet:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public <K, V> ConsumerDelegate<K, V> create(ConsumerConfig config,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
try {
GroupProtocol groupProtocol = GroupProtocol.valueOf(config.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG).toUpperCase(Locale.ROOT));
if (groupProtocol == GroupProtocol.CONSUMER)
return new AsyncKafkaConsumer<>(config, keyDeserializer, valueDeserializer);
else
return new ClassicKafkaConsumer<>(config, keyDeserializer, valueDeserializer);
} catch (KafkaException e) {
throw e;
} catch (Throwable t) {
throw new KafkaException("Failed to construct Kafka consumer", t);
}
}
The comment above it reads:
1
2
3
4
5
* The current logic for the {@code ConsumerCreator} inspects the incoming configuration and determines if
* it is using the new consumer group protocol (KIP-848) or if it should fall back to the existing, legacy group
* protocol. This is based on the presence and value of the {@link ConsumerConfig#GROUP_PROTOCOL_CONFIG group.protocol}
* configuration. If the value is present and equal to "{@code consumer}", the {@link AsyncKafkaConsumer}
* will be returned. Otherwise, the {@link ClassicKafkaConsumer} will be returned.
so if we have group.protocol=consumer
, we use the new AsyncKafkaConsumer
KIP-848 consumer, otherwise we default to ClassicKafkaConsumer
.
In this post, let’s stick to the classic consumer.
ClassicKafkaConsumer
I like to listen to classical music when working sometimes, it’s can be very calming. Orchestras come to mind when I say ClassicKafkaConsumer
. I also think about the movie Whiplash, which is fantastic!
Anyway, let’s take a look at the constructor (abridged):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
ClassicKafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
try {
GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(config,
GroupRebalanceConfig.ProtocolType.CONSUMER);
this.groupId = Optional.ofNullable(groupRebalanceConfig.groupId);
this.clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
LogContext logContext = createLogContext(config, groupRebalanceConfig);
this.log = logContext.logger(getClass());
boolean enableAutoCommit = config.getBoolean(ENABLE_AUTO_COMMIT_CONFIG);
log.debug("Initializing the Kafka consumer");
this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
this.time = Time.SYSTEM;
List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config);
this.clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config);
this.clientTelemetryReporter.ifPresent(reporters::add);
this.metrics = createMetrics(config, time, reporters);
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
this.retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
List<ConsumerInterceptor<K, V>> interceptorList = configuredConsumerInterceptors(config);
this.interceptors = new ConsumerInterceptors<>(interceptorList, metrics);
this.deserializers = new Deserializers<>(config, keyDeserializer, valueDeserializer, metrics);
this.subscriptions = createSubscriptionState(config, logContext);
ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(
metrics.reporters(),
interceptorList,
Arrays.asList(this.deserializers.keyDeserializer(), this.deserializers.valueDeserializer()));
this.metadata = new ConsumerMetadata(config, subscriptions, logContext, clusterResourceListeners);
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config);
this.metadata.bootstrap(addresses);
FetchMetricsManager fetchMetricsManager = createFetchMetricsManager(metrics);
FetchConfig fetchConfig = new FetchConfig(config);
this.isolationLevel = fetchConfig.isolationLevel;
ApiVersions apiVersions = new ApiVersions();
this.client = createConsumerNetworkClient(config,
metrics,
logContext,
apiVersions,
time,
metadata,
fetchMetricsManager.throttleTimeSensor(),
retryBackoffMs,
clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null));
this.assignors = ConsumerPartitionAssignor.getAssignorInstances(
config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId))
);
// no coordinator will be constructed for the default (null) group id
if (groupId.isEmpty()) {
config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
config.ignore(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
this.coordinator = null;
} else {
this.coordinator = new ConsumerCoordinator(groupRebalanceConfig,
logContext,
this.client,
assignors,
this.metadata,
this.subscriptions,
metrics,
CONSUMER_METRIC_GROUP_PREFIX,
this.time,
enableAutoCommit,
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
this.interceptors,
config.getBoolean(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED),
config.getString(ConsumerConfig.CLIENT_RACK_CONFIG),
clientTelemetryReporter);
}
this.fetcher = new Fetcher<>(
logContext,
this.client,
this.metadata,
this.subscriptions,
fetchConfig,
this.deserializers,
fetchMetricsManager,
this.time,
apiVersions);
this.offsetFetcher = new OffsetFetcher(logContext,
client,
metadata,
subscriptions,
time,
retryBackoffMs,
requestTimeoutMs,
isolationLevel,
apiVersions);
this.topicMetadataFetcher = new TopicMetadataFetcher(logContext,
client,
retryBackoffMs,
retryBackoffMaxMs);
log.debug("Kafka consumer initialized");
} catch (Throwable t) {
// ...
}
}
A lot of interesting stuff is going on here. We first see that all the major consumer properties are being initialized: clientId
, groupID
, requestTimeoutMs
( a single request timeout), defaultApiTimeoutMs
(a client API operation timeout such as Fetch
) and deserializers
(plugins that turn key and value bytes in java objects).
After that, the building blocks of our consumer are initialized:
assignors
: to assign partition to consumerssubscriptions
: state of our subscriptionscoordinator
: group coordinator.- Fetchers:
fetcher
,offsetFetcher
andtopicMetadataFetcher
.
Conclusion
If you want to get in touch, you can find me on Twitter/X at @moncef_abboud.