Post

Behind Sending Millions of Messages Per Second: A Look Under the Hood of Kafka Producer

Exploring the code behind the Kafka Producer client.

Behind Sending Millions of Messages Per Second: A Look Under the Hood of Kafka Producer

Intro

When you work with something for a while, you start to feel like you really know it. But just as you truly get to know someone when you live with them, you truly understand a piece of technology when you examine its code. With that in mind, let’s take a closer look at the Kafka producer and see if it turns out to be a messy roommate.

Post Goals

The canonical Kafka Producer looks as follows:

1
2
3
4
5
6
7
8
9
10
11
 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("linger.ms", 1);
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 Producer<String, String> producer = new KafkaProducer<>(props);
 for (int i = 0; i < 100; i++)
     producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

 producer.close();

Some properties, a constructor, and a simple send method. This short snippet powers workloads handling millions of messages per second. It’s quite impressive.

One goal is to examine the code behind this code to get a feel for it and demystify its workings. Another is to understand where properties like batch.size, linger.ms, acks, buffer.memory, and others fit in, how they balance latency and throughput to achieve the desired performance.

The Entrypoint: KafkaProducer class

The entrypoint to the Kafka producer is unsurprisingly the KafkaProducer class. To keep things simple, we’re going to ignore all telemetry and transaction-related code.

The Constructor

Let’s take a look at the constructor (abridged):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
    KafkaProducer(ProducerConfig config,
                  Serializer<K> keySerializer,
                  Serializer<V> valueSerializer,
                  ProducerMetadata metadata,
                  KafkaClient kafkaClient,
                  ProducerInterceptors<K, V> interceptors,
                  ApiVersions apiVersions,
                  Time time) {
        try {
            this.producerConfig = config;
            this.time = time;

            this.clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);

            LogContext logContext;
            if (transactionalId == null)
                logContext = new LogContext(String.format("[Producer clientId=%s] ", clientId));
            else
                logContext = new LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ", clientId, transactionalId));
            log = logContext.logger(KafkaProducer.class);
            log.trace("Starting the Kafka producer");

            this.partitionerPlugin = Plugin.wrapInstance(
                    config.getConfiguredInstance(
                        ProducerConfig.PARTITIONER_CLASS_CONFIG,
                        Partitioner.class,
                        Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)),
                    metrics,
                    ProducerConfig.PARTITIONER_CLASS_CONFIG);
            this.partitionerIgnoreKeys = config.getBoolean(ProducerConfig.PARTITIONER_IGNORE_KEYS_CONFIG);
            long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
            long retryBackoffMaxMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
            if (keySerializer == null) {
                keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
                keySerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), true);
            } else {
                config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
            }
            this.keySerializerPlugin = Plugin.wrapInstance(keySerializer, metrics, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);

            if (valueSerializer == null) {
                valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
                valueSerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), false);
            } else {
                config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
            }
            this.valueSerializerPlugin = Plugin.wrapInstance(valueSerializer, metrics, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);


            List<ProducerInterceptor<K, V>> interceptorList = (List<ProducerInterceptor<K, V>>) ClientUtils.configuredInterceptors(config,
                    ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                    ProducerInterceptor.class);
            if (interceptors != null)
                this.interceptors = interceptors;
            else
                this.interceptors = new ProducerInterceptors<>(interceptorList, metrics);
            ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(
                    interceptorList,
                    reporters,
                    Arrays.asList(this.keySerializerPlugin.get(), this.valueSerializerPlugin.get()));
            this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
            this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
            this.compression = configureCompression(config);

            this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
            int deliveryTimeoutMs = configureDeliveryTimeout(config, log);

            this.apiVersions = apiVersions;

            // There is no need to do work required for adaptive partitioning, if we use a custom partitioner.
            boolean enableAdaptivePartitioning = partitionerPlugin.get() == null &&
                config.getBoolean(ProducerConfig.PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG);
            RecordAccumulator.PartitionerConfig partitionerConfig = new RecordAccumulator.PartitionerConfig(
                enableAdaptivePartitioning,
                config.getLong(ProducerConfig.PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG)
            );
            // As per Kafka producer configuration documentation batch.size may be set to 0 to explicitly disable
            // batching which in practice actually means using a batch size of 1.
            int batchSize = Math.max(1, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG));
            this.accumulator = new RecordAccumulator(logContext,
                    batchSize,
                    compression,
                    lingerMs(config),
                    retryBackoffMs,
                    retryBackoffMaxMs,
                    deliveryTimeoutMs,
                    partitionerConfig,
                    metrics,
                    PRODUCER_METRIC_GROUP_NAME,
                    time,
                    apiVersions,
                    transactionManager,
                    new BufferPool(this.totalMemorySize, batchSize, metrics, time, PRODUCER_METRIC_GROUP_NAME));

            List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config);
            if (metadata != null) {
                this.metadata = metadata;
            } else {
                this.metadata = new ProducerMetadata(retryBackoffMs,
                        retryBackoffMaxMs,
                        config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
                        config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG),
                        logContext,
                        clusterResourceListeners,
                        Time.SYSTEM);
                this.metadata.bootstrap(addresses);
            }

            this.sender = newSender(logContext, kafkaClient, this.metadata);
            String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
            this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
            this.ioThread.start();
            config.logUnused();
            AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
            log.debug("Kafka producer started");
        } catch (Throwable t) {
            // call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
            close(Duration.ofMillis(0), true);
            // now propagate the exception
            throw new KafkaException("Failed to construct kafka producer", t);
        }
    }

There’s a flurry of interesting things happening here. First, let’s take note of some producer properties being fetched from the configuration.

My eyes immediately scan for BATCH_SIZE_CONFIG, lingerMs, BUFFER_MEMORY_CONFIG, and MAX_BLOCK_MS_CONFIG.

We can see CLIENT_ID_CONFIG (client.id), along with retry-related properties like RETRY_BACKOFF_MS_CONFIG and RETRY_BACKOFF_MAX_MS_CONFIG.

The constructor also attempts to dynamically load PARTITIONER_CLASS_CONFIG, which specifies a custom partitioner class. Right after that, there’s PARTITIONER_IGNORE_KEYS_CONFIG, indicating whether key hashes should be used to select a partition in the DefaultPartitioner (when no custom partitioner is provided).

Of course, we also see the Key and Value serializer plugins being initialized. Our Java object-to-bytes translators.

Two other objects are initialized, which I believe are the real workhorses:

  • this.accumulator (RecordAccumulator): Holds and accumulates the queues containing record batches.
  • this.sender (Sender): The thread that iterates over the accumulated batches and sends the ready ones over the network.

We also spot this line which validates the bootstrap servers:

1
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config);

Simplified, it looks as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
List<String> urls = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
String clientDnsLookupConfig = config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG);
        List<InetSocketAddress> addresses = new ArrayList<>();
        for (String url : urls) {
            if (url != null && !url.isEmpty()) {
                    String host = getHost(url);
                    Integer port = getPort(url);
                    if (clientDnsLookup == ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY) {
                        InetAddress[] inetAddresses = InetAddress.getAllByName(host);
                        for (InetAddress inetAddress : inetAddresses) {
                            String resolvedCanonicalName = inetAddress.getCanonicalHostName();
                            InetSocketAddress address = new InetSocketAddress(resolvedCanonicalName, port);
                            if (address.isUnresolved()) {
                                log.warn("Couldn't resolve server {} from {} as DNS resolution of the canonical hostname {} failed for {}", url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, resolvedCanonicalName, host);
                            } else {
                                addresses.add(address);
                            }
                        }
                    } else {
                        InetSocketAddress address = new InetSocketAddress(host, port);
                        if (address.isUnresolved()) {
                            log.warn("Couldn't resolve server {} from {} as DNS resolution failed for {}", url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, host);
                        } else {
                            addresses.add(address);
                        }
                    }
            }
        }

The key objective behind RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY (KIP-235) is to handle DNS aliases. How? First, we retrieve all IPs associated with a DNS (getAllByName), then perform a reverse DNS lookup (getCanonicalHostName) to obtain the corresponding addresses. This ensures that if we have a VIP or DNS alias for multiple brokers, they are all resolved.

Anyway, the KafkaProducer constructor alone reveals a lot about what’s happening under the hood. Now, let’s take a look at the send method.

send method

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
    /**
     * Asynchronously send a record to a topic and invoke the provided callback when the send has been acknowledged.
     * <p>
     * The send is asynchronous and this method will return immediately (except for rare cases described below)
     * once the record has been stored in the buffer of records waiting to be sent.
     * This allows sending many records in parallel without blocking to wait for the response after each one.
     * Can block for the following cases: 1) For the first record being sent to 
     * the cluster by this client for the given topic. In this case it will block for up to {@code max.block.ms} milliseconds if 
     * Kafka cluster is unreachable; 2) Allocating a buffer if buffer pool doesn't have any free buffers.
     * <p>
     * The result of the send is a {@link RecordMetadata} specifying the partition the record was sent to, the offset
     * it was assigned and the timestamp of the record. If the producer is configured with acks = 0, the {@link RecordMetadata}
     * will have offset = -1 because the producer does not wait for the acknowledgement from the broker.
     * If {@link org.apache.kafka.common.record.TimestampType#CREATE_TIME CreateTime} is used by the topic, the timestamp
     * will be the user provided timestamp or the record send time if the user did not specify a timestamp for the
     * record. If {@link org.apache.kafka.common.record.TimestampType#LOG_APPEND_TIME LogAppendTime} is used for the
     * topic, the timestamp will be the Kafka broker local time when the message is appended.
     * <p>
     * Since the send call is asynchronous it returns a {@link java.util.concurrent.Future Future} for the
     * {@link RecordMetadata} that will be assigned to this record. Invoking {@link java.util.concurrent.Future#get()
     * get()} on this future will block until the associated request completes and then return the metadata for the record
     * or throw any exception that occurred while sending the record.
     * <p>
     * If you want to simulate a simple blocking call you can call the <code>get()</code> method immediately
     * ...
     **/
    @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        // intercept the record, which can be potentially modified; this method does not throw exceptions
        ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
        return doSend(interceptedRecord, callback);
    }

The method’s description is spot on. It tells us that the method is asynchronous but may block if the cluster is unreachable or if there isn’t enough memory to allocate a buffer. We also learn that when acks=0 (AKA “fire and forget”), the producer doesn’t expect an acknowledgment from the broker and sets the result offset to -1 instead of using the actual offset returned by the broker.

Interceptors act as middleware that take in a record and return either the same record or a modified version. They can do anything from adding headers for telemetry to altering the data.

After that, doSend is invoked. We could just trust it and call it a day—interceptors and doSend should be good enough for us.

Jokes aside, here’s doSend abridged:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
        // Append callback takes care of the following:
        //  - call interceptors and user callback on completion
        //  - remember partition that is calculated in RecordAccumulator.append
        AppendCallbacks appendCallbacks = new AppendCallbacks(callback, this.interceptors, record);

        try {
            throwIfProducerClosed();
            // first make sure the metadata for the topic is available
            long nowMs = time.milliseconds();
            ClusterAndWaitTime clusterAndWaitTime;
            try {
                clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
            } catch (KafkaException e) {
                if (metadata.isClosed())
                    throw new KafkaException("Producer closed while send in progress", e);
                throw e;
            }
            nowMs += clusterAndWaitTime.waitedOnMetadataMs;
            long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
            Cluster cluster = clusterAndWaitTime.cluster;
            byte[] serializedKey;
            try {
                serializedKey = keySerializerPlugin.get().serialize(record.topic(), record.headers(), record.key());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in key.serializer", cce);
            }
            byte[] serializedValue;
            try {
                serializedValue = valueSerializerPlugin.get().serialize(record.topic(), record.headers(), record.value());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in value.serializer", cce);
            }

            // Try to calculate partition, but note that after this call it can be RecordMetadata.UNKNOWN_PARTITION,
            // which means that the RecordAccumulator would pick a partition using built-in logic (which may
            // take into account broker load, the amount of data produced to each partition, etc.).
            int partition = partition(record, serializedKey, serializedValue, cluster);

            setReadOnly(record.headers());
            Header[] headers = record.headers().toArray();

            int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(RecordBatch.CURRENT_MAGIC_VALUE,
                    compression.type(), serializedKey, serializedValue, headers);
            ensureValidRecordSize(serializedSize);
            long timestamp = record.timestamp() == null ? nowMs : record.timestamp();

            // Append the record to the accumulator.  Note, that the actual partition may be
            // calculated there and can be accessed via appendCallbacks.topicPartition.
            RecordAccumulator.RecordAppendResult result = accumulator.append(record.topic(), partition, timestamp, serializedKey,
                    serializedValue, headers, appendCallbacks, remainingWaitMs, nowMs, cluster);
            assert appendCallbacks.getPartition() != RecordMetadata.UNKNOWN_PARTITION;

            // Add the partition to the transaction (if in progress) after it has been successfully
            // appended to the accumulator. We cannot do it before because the partition may be
            // unknown. Note that the `Sender` will refuse to dequeue
            // batches from the accumulator until they have been added to the transaction.
            if (transactionManager != null) {
                transactionManager.maybeAddPartition(appendCallbacks.topicPartition());
            }

            if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), appendCallbacks.getPartition());
                this.sender.wakeup();
            }
            return result.future;
            // handling exceptions and record the errors;
            // for API exceptions return them in the future,
            // for other exceptions throw directly
        } catch (ApiException e) {

            // ...

        }

We start by creating AppendCallbacks, which include both the user-supplied callback and interceptors (whose onAcknowledgement method will be invoked). This allows users to interact with the producer request results, whether they succeed or fail.

For each topic partition we send data to, we need to determine its leader so we can request it to persist our data. That’s where waitOnMetadata comes in. It issues a Metadata API request to one of the bootstrap servers and caches the response, preventing the need to issue a request for every record.

Next, the record’s key and value are converted from Java objects to bytes using keySerializerPlugin.get().serialize and valueSerializerPlugin.get().serialize.

Finally, we determine the record’s partition using partition(record, serializedKey, serializedValue, cluster):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
    /**
     * computes partition for given record.
     * if the record has partition returns the value otherwise
     * if custom partitioner is specified, call it to compute partition
     * otherwise try to calculate partition based on key.
     * If there is no key or key should be ignored return
     * RecordMetadata.UNKNOWN_PARTITION to indicate any partition
     * can be used (the partition is then calculated by built-in
     * partitioning logic).
     */
    private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
        if (record.partition() != null)
            return record.partition();

        if (partitionerPlugin.get() != null) {
            int customPartition = partitionerPlugin.get().partition(
                record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
            if (customPartition < 0) {
                throw new IllegalArgumentException(String.format(
                    "The partitioner generated an invalid partition number: %d. Partition number should always be non-negative.", customPartition));
            }
            return customPartition;
        }

        if (serializedKey != null && !partitionerIgnoreKeys) {
            // hash the keyBytes to choose a partition
            return BuiltInPartitioner.partitionForKey(serializedKey, cluster.partitionsForTopic(record.topic()).size());
        } else {
            return RecordMetadata.UNKNOWN_PARTITION;
        }
    }

If we have a custom partitioner, we use it. Otherwise, if we have a key and partitioner.ignore.keys is false (the default), we rely on the famous key hash by calling BuiltInPartitioner.partitionForKey, which under the hood is:

1
2
3
4
5
6
7
    /*
     * Default hashing function to choose a partition from the serialized key bytes
     */
    public static int partitionForKey(final byte[] serializedKey, final int numPartitions) {
        return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
    }

This is so satisfying! You read about it in various documentation, and it turns out to be exactly as described—getting a partition based on the Murmur2 (a famous hashing algo) key hash.

However, if there’s no key, UNKNOWN_PARTITION is returned, and a partition is chosen using a sticky partitioner. This ensures that all partition-less records are grouped into the same partition, allowing for larger batch sizes. The partition selection also considers leader node latency statistics.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
    private long batchReady(boolean exhausted, TopicPartition part, Node leader,
                            long waitedTimeMs, boolean backingOff, int backoffAttempts,
                            boolean full, long nextReadyCheckDelayMs, Set<Node> readyNodes) {
        if (!readyNodes.contains(leader) && !isMuted(part)) {
            long timeToWaitMs = backingOff ? retryBackoff.backoff(backoffAttempts > 0 ? backoffAttempts - 1 : 0) : lingerMs;
            boolean expired = waitedTimeMs >= timeToWaitMs;
            boolean transactionCompleting = transactionManager != null && transactionManager.isCompleting();
            boolean sendable = full
                    || expired
                    || exhausted
                    || closed
                    || flushInProgress()
                    || transactionCompleting;
            if (sendable && !backingOff) {
                readyNodes.add(leader);
            } else {
                long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                // Note that this results in a conservative estimate since an un-sendable partition may have
                // a leader that will later be found to have sendable data. However, this is good enough
                // since we'll just wake up and then sleep again for the remaining time.
                nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
            }
        }
        return nextReadyCheckDelayMs;
    }

After that we pass the ball to the RecordAccumulator using accumulator.append and it will takes care of allocating a buffer for each batch and adding the record to it.

RecordAccumulator

The class documentation reads:

1
2
3
4
5
6
7
/**
 * This class acts as a queue that accumulates records into {@link MemoryRecords}
 * instances to be sent to the server.
 * <p>
 * The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless
 * this behavior is explicitly disabled.
 */

and the object is instantiated within the KafkaProducer’s constructor:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
this.accumulator = new RecordAccumulator(logContext,
                    batchSize,
                    compression,
                    lingerMs(config),
                    retryBackoffMs,
                    retryBackoffMaxMs,
                    deliveryTimeoutMs,
                    partitionerConfig,
                    metrics,
                    PRODUCER_METRIC_GROUP_NAME,
                    time,
                    apiVersions,
                    transactionManager,
                    new BufferPool(this.totalMemorySize, batchSize, metrics, time, PRODUCER_METRIC_GROUP_NAME));

This is where batching takes place. Where the tradeoff between batch.size and linger.ms is implemented. Where retries are made. And where a produce attempt is timed out after deliveryTimeoutMs (defaults to 2 min).

The producer’s doSend calls the Accumulator’s append method:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
    public RecordAppendResult append(String topic,
                                     int partition,
                                     long timestamp,
                                     byte[] key,
                                     byte[] value,
                                     Header[] headers,
                                     AppendCallbacks callbacks,
                                     long maxTimeToBlock,
                                     long nowMs,
                                     Cluster cluster) throws InterruptedException {
        TopicInfo topicInfo = topicInfoMap.computeIfAbsent(topic, k -> new TopicInfo(createBuiltInPartitioner(logContext, k, batchSize)));

        // We keep track of the number of appending thread to make sure we do not miss batches in
        // abortIncompleteBatches().
        appendsInProgress.incrementAndGet();
        ByteBuffer buffer = null;
        if (headers == null) headers = Record.EMPTY_HEADERS;
        try {
            // Loop to retry in case we encounter partitioner's race conditions.
            while (true) {
                // If the message doesn't have any partition affinity, so we pick a partition based on the broker
                // availability and performance.  Note, that here we peek current partition before we hold the
                // deque lock, so we'll need to make sure that it's not changed while we were waiting for the
                // deque lock.
                final BuiltInPartitioner.StickyPartitionInfo partitionInfo;
                final int effectivePartition;
                if (partition == RecordMetadata.UNKNOWN_PARTITION) {
                    partitionInfo = topicInfo.builtInPartitioner.peekCurrentPartitionInfo(cluster);
                    effectivePartition = partitionInfo.partition();
                } else {
                    partitionInfo = null;
                    effectivePartition = partition;
                }

                // Now that we know the effective partition, let the caller know.
                setPartition(callbacks, effectivePartition);

                // check if we have an in-progress batch
                Deque<ProducerBatch> dq = topicInfo.batches.computeIfAbsent(effectivePartition, k -> new ArrayDeque<>());
                synchronized (dq) {
                    // After taking the lock, validate that the partition hasn't changed and retry.
                    if (partitionChanged(topic, topicInfo, partitionInfo, dq, nowMs, cluster))
                        continue;

                    RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs);
                    if (appendResult != null) {
                        // If queue has incomplete batches we disable switch (see comments in updatePartitionInfo).
                        boolean enableSwitch = allBatchesFull(dq);
                        topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);
                        return appendResult;
                    }
                }

                if (buffer == null) {
                    int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(
                            RecordBatch.CURRENT_MAGIC_VALUE, compression.type(), key, value, headers));
                    log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, topic, effectivePartition, maxTimeToBlock);
                    // This call may block if we exhausted buffer space.
                    buffer = free.allocate(size, maxTimeToBlock);
                    // Update the current time in case the buffer allocation blocked above.
                    // NOTE: getting time may be expensive, so calling it under a lock
                    // should be avoided.
                    nowMs = time.milliseconds();
                }

                synchronized (dq) {
                    // After taking the lock, validate that the partition hasn't changed and retry.
                    if (partitionChanged(topic, topicInfo, partitionInfo, dq, nowMs, cluster))
                        continue;

                    RecordAppendResult appendResult = appendNewBatch(topic, effectivePartition, dq, timestamp, key, value, headers, callbacks, buffer, nowMs);
                    // Set buffer to null, so that deallocate doesn't return it back to free pool, since it's used in the batch.
                    if (appendResult.newBatchCreated)
                        buffer = null;
                    // If queue has incomplete batches we disable switch (see comments in updatePartitionInfo).
                    boolean enableSwitch = allBatchesFull(dq);
                    topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);
                    return appendResult;
                }
            }
        } finally {
            free.deallocate(buffer);
            appendsInProgress.decrementAndGet();
        }
    }

We start with TopicInfo topicInfo = topicInfoMap.computeIfAbsent(topic, k -> new TopicInfo(createBuiltInPartitioner(logContext, k, batchSize)));, in my opinion, topicInfoMap is the most important variable in this whole class. Here is its init code followed by the TopicInfo class:

1
2
3
4
5
6
7
8
9
10
11
12
13
    private final ConcurrentMap<String /*topic*/, TopicInfo> topicInfoMap = new CopyOnWriteMap<>();

    /**
     * Per topic info.
     */
    private static class TopicInfo {
        public final ConcurrentMap<Integer /*partition*/, Deque<ProducerBatch>> batches = new CopyOnWriteMap<>();
        public final BuiltInPartitioner builtInPartitioner;

        public TopicInfo(BuiltInPartitioner builtInPartitioner) {
            this.builtInPartitioner = builtInPartitioner;
        }
    }

We maintain a ConcurrentMap keyed by topic, where each value is a TopicInfo object. This object, in turn, holds another ConcurrentMap keyed by partition, with values being a Deque (double-ended queue) of batches. The core responsibility of RecordAccumulator is to allocate memory for these record batches and fill them with records, either until linger.ms is reached or the batch reaches its batch.size limit.

Notice how we use computeIfAbsent to retrieve the TopicInfo, and later use it again to get the ProducerBatch deque:

1
2
// Check if we have an in-progress batch  
Deque<ProducerBatch> dq = topicInfo.batches.computeIfAbsent(effectivePartition, k -> new ArrayDeque<>());

This computeIfAbsent call is at the heart of the Kafka Producer batching mechanism. The send method ultimately calls append, and within it, there’s a map that holds another map, which holds a queue of batches for each partition. As long as a batch remains open (i.e. not older than linger.ms and not full up to batch.size), it’s reused and new records are appended to it and batched together.

Once we retrieve topicInfo and increment the appendsInProgress counter-used to abort batches in case of errors—we enter an infinite loop. This loop either exits with a return or an exception. It’s necessary because the target partition might change while we’re inside the loop. Remember, the Kafka Producer is designed for a multi-threaded environment and is considered thread-safe. Additionally, the batch we’re trying to append to might become full or not have enough space, requiring a retry.

Inside the loop, if the record has an UNKNOWN_PARTITION (meaning there’s no custom partitioner and no key-based partitioning), a sticky partition is selected using builtInPartitioner.peekCurrentPartitionInfo, based on broker availability and performance stats.

At this point, we have the partition’s Deque<ProducerBatch>, and we use synchronized (dq) to ensure no other threads interfere. Then, tryAppend is called:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
    /**
     *  Try to append to a ProducerBatch.
     *
     *  If it is full, we return null and a new batch is created. We also close the batch for record appends to free up
     *  resources like compression buffers. The batch will be fully closed (ie. the record batch headers will be written
     *  and memory records built) in one of the following cases (whichever comes first): right before send,
     *  if it is expired, or when the producer is closed.
     */
    private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
                                         Callback callback, Deque<ProducerBatch> deque, long nowMs) {
        if (closed)
            throw new KafkaException("Producer closed while send in progress");
        ProducerBatch last = deque.peekLast();
        if (last != null) {
            int initialBytes = last.estimatedSizeInBytes();
            FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, nowMs);
            if (future == null) {
                last.closeForRecordAppends();
            } else {
                int appendedBytes = last.estimatedSizeInBytes() - initialBytes;
                return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, appendedBytes);
            }
        }
        return null;
    }

If the producer is not closed and there’s a producer batch in the queue, we attempt to append to it. If appending fails (future == null), we close the batch so it can be sent and removed from the queue. If it succeeds, we return a RecordAppendResult object.

Now, let’s look at if (buffer == null) inside append. This condition is met if the dequeue had no RecordBatch or if appending to an existing RecordBatch failed. In that case, we allocate a new buffer using free.allocate. This allocation process is quite interesting, and we’ll dive into it in the upcoming BufferPool section.

After allocating the buffer, appendNewBatch is called to create a new batch and add it to the queue. But before doing so, it first checks whether another thread has already created a new batch:

1
2
3
4
5
6
7
8
9
10
11
12
13
      // Inside  private RecordAppendResult appendNewBatch
      RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs);
        if (appendResult != null) {
            // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
            return appendResult;
        }

        MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer);
        ProducerBatch batch = new ProducerBatch(new TopicPartition(topic, partition), recordsBuilder, nowMs);
        FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
                callbacks, nowMs));

        dq.addLast(batch);

The // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... comment is just a sight for sore eyes. When it comes to multithreading, hope is all we got.

After the batch append, we call builtInPartitioner.updatePartitionInfo which might change the sticky partition.

Finally, if the allocated buffer has not been successfully used in a new batch, it will be deallocated to free up memory.

The Magical BufferPool

So, KafkaProducer calls its send method, which in turn calls RecordAccumulator’s append method. This method is responsible for adding records to an in-memory batch until they are sent.

But how much memory do we use? How do we free memory? How do we manage each batch’s memory to ensure that no one is starved while also staying within the producer’s configured memory limit (buffer.memory, which defaults to 32 MB)?

The answers to all these questions lead us to the BufferPool class. Its documentation reads:

1
2
3
4
5
6
7
8
9
10
/**
 * A pool of ByteBuffers kept under a given memory limit. This class is fairly specific to the needs of the producer. In
 * particular it has the following properties:
 * <ol>
 * <li>There is a special "poolable size" and buffers of this size are kept in a free list and recycled
 * <li>It is fair. That is all memory is given to the longest waiting thread until it has sufficient memory. This
 * prevents starvation or deadlock when a thread asks for a large chunk of memory and needs to block until multiple
 * buffers are deallocated.
 * </ol>
 */

The BufferPool’s constructor has two key args:

1
2
3
4
5
6
7
8
/**
 * Create a new buffer pool
 *
 * @param memory The maximum amount of memory that this buffer pool can allocate
 * @param poolableSize The buffer size to cache in the free list rather than deallocating
 *...
 */
public BufferPool(long memory, int poolableSize, ...)

The BufferPool is initialized all the way in the KafkaProducer:

1
new BufferPool(this.totalMemorySize, batchSize, ...)

totalMemorySize is the famous buffer.memory which is the total amount of memory the producer can use to buffer records and batchSize is arguably the most famous producer config i.e. batch.size.

The RecordAccumulator tries to allocate buffer:

1
2
3
4
5
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(
        RecordBatch.CURRENT_MAGIC_VALUE, compression.type(), key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, topic, effectivePartition, maxTimeToBlock);
// This call may block if we exhausted buffer space.
buffer = free.allocate(size, maxTimeToBlock);

Notice the max: the buffer’size can exceed batch.size if we have a record that exceeds that size.

free is our BufferPool and we try try to allocate our ByteBuffer and wait up to maxTimeToBlock which is none other than max.block.ms (minus any time it took to get the topic’s metadata if it was not already present).

So, how does allocate work?

We start by making sanity checks and acquiring the lock and making sure there is no contention with other thread:

1
2
3
4
5
6
7
8
9
10
11
12
13
        if (size > this.totalMemory)
            throw new IllegalArgumentException("Attempt to allocate " + size
                                               + " bytes, but there is a hard limit of "
                                               + this.totalMemory
                                               + " on memory allocations.");

        ByteBuffer buffer = null;
        this.lock.lock();

        if (this.closed) {
            this.lock.unlock();
            throw new KafkaException("Producer closed while allocating memory");
        }

if BufferPool allocates a buffer of size batch.size, it will add it a Deque<ByteBuffer> aptly called free when it gets freed. If the buffer’s size is different than batch.size, it’s unlikely another request will need a similar one and it is thus simply discarded and we account for that in nonPooledAvailableMemory. Most allocation requests will be for the configured batch.size, unless there is a message that exceeds that.

1
2
3
4
// check if we have a free buffer of the right size pooled
if (size == poolableSize && !this.free.isEmpty())
    return this.free.pollFirst();

After acquiring the lock, if the request size is equal to batch.size and there is an available buffer in our free deque, we simply grab it and return.

If not, we check if there is enough memory (pooled and unpooled), and if necessary, freeUp will free some buffer from the free deque:

1
2
3
4
5
6
7
8
9
 // now check if the request is immediately satisfiable with the
// memory on hand or if we need to block
int freeListSize = freeSize() * this.poolableSize;
 if (this.nonPooledAvailableMemory + freeListSize >= size) {
                // we have enough unallocated or pooled memory to immediately
                // satisfy the request, but need to allocate the buffer
                freeUp(size);
                this.nonPooledAvailableMemory -= size;
            }

freeUp is simply grabbing buffers from free until we have enough memory:

1
2
3
4
5
6
7
8
    /**
     * Attempt to ensure we have at least the requested number of bytes of memory for allocation by deallocating pooled
     * buffers (if needed)
     */
    private void freeUp(int size) {
        while (!this.free.isEmpty() && this.nonPooledAvailableMemory < size)
            this.nonPooledAvailableMemory += this.free.pollLast().capacity();
    }

If there is enough memory for size, great! We simply call ByteBuffer.allocate (Java’s native allocation) using safeAllocateByteBuffer which adds a nice try to protect against OOM.

However, if we were unable to find enough memory, we enter into the following loop:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
          Condition moreMemory = this.lock.newCondition();
          while (accumulated < size) {
                        long startWaitNs = time.nanoseconds();
                        long timeNs;
                        boolean waitingTimeElapsed;
                        try {
                            waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
                        } finally {
                            long endWaitNs = time.nanoseconds();
                            timeNs = Math.max(0L, endWaitNs - startWaitNs);
                            recordWaitTime(timeNs);
                        }

                       if (waitingTimeElapsed) {
                            this.metrics.sensor("buffer-exhausted-records").record();
                            throw new BufferExhaustedException("Failed to allocate " + size + " bytes within the configured max blocking time "
                                + maxTimeToBlockMs + " ms. Total memory: " + totalMemory() + " bytes. Available memory: " + availableMemory()
                                + " bytes. Poolable size: " + poolableSize() + " bytes");
                        }

                        remainingTimeToBlockNs -= timeNs;
                        // check if we can satisfy this request from the free list,
                        // otherwise allocate memory
                        if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
                            // just grab a buffer from the free list
                            buffer = this.free.pollFirst();
                            accumulated = size;
                        } else {
                            // we'll need to allocate memory, but we may only get
                            // part of what we need on this iteration
                            freeUp(size - accumulated);
                            int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);
                            this.nonPooledAvailableMemory -= got;
                            accumulated += got;
                        }
                    }

The loop uses Condition to put the thread to sleep and release the lock. The idea is to wait for another thread to free up memory and call signal(), waking up our sleeping thread.

If our thread stops waiting because it has waited too long, an exception is thrown. Otherwise, it means another thread has freed some memory, so we check if a buffer is available in free or call freeUp to reclaim memory.

This while loop exits when enough memory has been accumulated or after maxTimeToBlockMs has elapsed.

After the loop, we encounter this nifty finally block:

1
2
3
4
5
6
7
8
9
10
11
12
13
this.waiters.remove(moreMemory);
//...
finally {
            // signal any additional waiters if there is more memory left
            // over for them
            try {
                if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty())
                    this.waiters.peekFirst().signal();
            } finally {
                // Another finally... otherwise find bugs complains
                lock.unlock();
            }
        }

this.waiters is a Deque<Condition>, where all allocate calls that are waiting for memory get added.

Before exiting, we first remove the current thread’s Condition from waiters, since it’s no longer waiting for memory. Then, if there’s any free memory available, we call signal() on the first waiter (following FIFO semantics) to wake it up and allow it to proceed.

There’s also a deallocate method, which is called by RecordAccumulator once it’s done with the buffer.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
    /**
     * Return buffers to the pool. If they are of the poolable size add them to the free list, otherwise just mark the
     * memory as free.
     *
     * @param buffer The buffer to return
     * @param size The size of the buffer to mark as deallocated, note that this may be smaller than buffer.capacity
     *             since the buffer may re-allocate itself during in-place compression
     */
    public void deallocate(ByteBuffer buffer, int size) {
        lock.lock();
        try {
            if (size == this.poolableSize && size == buffer.capacity()) {
                buffer.clear();
                this.free.add(buffer);
            } else {
                this.nonPooledAvailableMemory += size;
            }
            Condition moreMem = this.waiters.peekFirst();
            if (moreMem != null)
                moreMem.signal();
        } finally {
            lock.unlock();
        }
    }

We acquire the lock, and if the buffer is of size batch.size, we clear it and add it to the free deque. If it’s not, we simply let the buffer be garbage collected and account for it in nonPooledAvailableMemory.

Most importantly, we call signal() on the first waiter (if there’s one), waking it up so it can use the memory we just freed.

And there you have it! That’s how the producer manages different buffers for the record batches to be sent.

These lines are executed for every Kafka-produced batch, meaning they run billions of times every second. The impact is truly tremendous!

Return to Sender

We return to the Sender thread. Before that, I’d like to note that “Return to Sender” is also the name of a tremendous Elvis Presley song. You might want to give it a listen.

Sender is simply a Runnable:

1
2
3
4
5
/**
 * The background thread that handles the sending of produce requests to the Kafka cluster. This thread makes metadata
 * requests to renew its view of the cluster and then sends produce requests to the appropriate nodes.
 */
public class Sender implements Runnable 

its main loop:

1
2
3
4
5
6
7
8
     // main loop, runs until close is called
        while (running) {
            try {
                runOnce();
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }

How about this runOnce? Here it is minus the transactional stuff:

1
2
3
4
        long currentTimeMs = time.milliseconds();
        long pollTimeout = sendProducerData(currentTimeMs);
        client.poll(pollTimeout, currentTimeMs); //

client is the network client and poll is to read and write data from the network sockets.

The meat and potatoes are in sendProducerData:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
    private long sendProducerData(long now) {
        MetadataSnapshot metadataSnapshot = metadata.fetchMetadataSnapshot();
        // get the list of partitions with data ready to send
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(metadataSnapshot, now);

        // ...

        // create produce requests
        Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(metadataSnapshot, result.readyNodes, this.maxRequestSize, now);

        //...

        sendProduceRequests(batches, now);
    } 

We start by calling the ready method of the accumulator and supplying it with a metadataSnapshot ( basically a list of the brokers, available partitions and leaders, etc).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
    /**
     * Get a list of nodes whose partitions are ready to be sent, and the earliest time at which any non-sendable
     * partition will be ready; Also return the flag for whether there are any unknown leaders for the accumulated
     * partition batches.
     * <p>
     * A destination node is ready to send data if:
     * <ol>
     * <li>There is at least one partition that is not backing off its send
     * <li><b>and</b> those partitions are not muted (to prevent reordering if
     *   {@value org.apache.kafka.clients.producer.ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION}
     *   is set to one)</li>
     * <li><b>and <i>any</i></b> of the following are true</li>
     * <ul>
     *     <li>The record set is full</li>
     *     <li>The record set has sat in the accumulator for at least lingerMs milliseconds</li>
     *     <li>The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions
     *     are immediately considered ready).</li>
     *     <li>The accumulator has been closed</li>
     * </ul>
     * </ol>
     */
    public ReadyCheckResult ready(MetadataSnapshot metadataSnapshot, long nowMs) {
        Set<Node> readyNodes = new HashSet<>();
        long nextReadyCheckDelayMs = Long.MAX_VALUE;
        Set<String> unknownLeaderTopics = new HashSet<>();
        // Go topic by topic so that we can get queue sizes for partitions in a topic and calculate
        // cumulative frequency table (used in partitioner).
        for (Map.Entry<String, TopicInfo> topicInfoEntry : this.topicInfoMap.entrySet()) {
            final String topic = topicInfoEntry.getKey();
            nextReadyCheckDelayMs = partitionReady(metadataSnapshot, nowMs, topic, topicInfoEntry.getValue(), nextReadyCheckDelayMs, readyNodes, unknownLeaderTopics);
        }
        return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
    }

This is it! This is where partitions and batches are deemed ready for sending. Where batch.size and linger.ms can shine. We loop over the topicInfoMap entries, and for each topic we call partitionReady:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// inside private long partitionReady

 boolean exhausted = this.free.queued() > 0; // threads waiting for memory
        for (Map.Entry<Integer, Deque<ProducerBatch>> entry : batches.entrySet()) {
            Deque<ProducerBatch> deque = entry.getValue();

                // ...

            synchronized (deque) {
                // Deques are often empty in this path, esp with large partition counts,
                // so we exit early if we can.
                ProducerBatch batch = deque.peekFirst();
                if (batch == null) {
                    continue;
                }

                waitedTimeMs = batch.waitedTimeMs(nowMs);
                batch.maybeUpdateLeaderEpoch(leaderEpoch);
                backingOff = shouldBackoff(batch.hasLeaderChangedForTheOngoingRetry(), batch, waitedTimeMs);
                backoffAttempts = batch.attempts();
                dequeSize = deque.size();
                full = dequeSize > 1 || batch.isFull();
            }

             if (leader == null) {
                // This is a partition for which leader is not known, but messages are available to send.
                // Note that entries are currently not removed from batches when deque is empty.
                unknownLeaderTopics.add(part.topic());
            }
            // ...
             nextReadyCheckDelayMs = batchReady(exhausted, part, leader, waitedTimeMs, backingOff,
                    backoffAttempts, full, nextReadyCheckDelayMs, readyNodes);
        }

We get information about the batch. Notably, waitedTimeMs which is how long has batch waited (now - createTime) and whether the batch is full. Then we call batchReady:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
    private long batchReady(boolean exhausted, TopicPartition part, Node leader,
                            long waitedTimeMs, boolean backingOff, int backoffAttempts,
                            boolean full, long nextReadyCheckDelayMs, Set<Node> readyNodes) {
        if (!readyNodes.contains(leader) && !isMuted(part)) {
            long timeToWaitMs = backingOff ? retryBackoff.backoff(backoffAttempts > 0 ? backoffAttempts - 1 : 0) : lingerMs;
            boolean expired = waitedTimeMs >= timeToWaitMs;
            boolean transactionCompleting = transactionManager != null && transactionManager.isCompleting();
            boolean sendable = full
                    || expired
                    || exhausted
                    || closed
                    || flushInProgress()
                    || transactionCompleting;
            if (sendable && !backingOff) {
                readyNodes.add(leader);
            } else {
                long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                // Note that this results in a conservative estimate since an un-sendable partition may have
                // a leader that will later be found to have sendable data. However, this is good enough
                // since we'll just wake up and then sleep again for the remaining time.
                nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
            }
        }
        return nextReadyCheckDelayMs;
    }

Notice timeToWaitMs. If we’re not backing off due to a retry, we wait for lingerMs. A batch is considered expired if the time we’ve already waited (now - createTime) exceeds lingerMs.

A batch is sendable if:

  • It has expired (we waited longer than linger.ms).
  • It is full (batch.size bytes reached).
  • Buffer memory is exhausted (other threads are waiting).
  • flush() has been called in KafkaProducer.
  • A transaction is completing.

After ready runs, readyNodes is populated with all nodes that have batches ready to be sent. Then, we call drain, which collects the batches from those nodes. For each topic partition, batches are bundled into a request, which is then handed off to the network client. Upon receiving a response, the appropriate callbacks are triggered, and the cycle continues.

One final observation: the Sender thread and the various application threads running RecordAccumulator are synchronized using synchronized (dq). A simple way to think about this is that the KafkaProducer adds records to RecordBatches queues via the RecordAccumulator, while the Sender thread continuously checks for sendable batches (based on various properties:linger.ms, batch.size, etc) and passes them to the network client.

Conclusion

This is nothing but a small snapshot into the innards of the Kafka Producer. A flurry of amazing technical details and smart design decisions have been passed by in this post. I’m also just sharing my understanding while going through the code. If I have misunderstood or got something wrong, please let me know.

If you want to get in touch, you can find me on Twitter at @moncef_abboud.

This post is licensed under CC BY 4.0 by the author.