Kafka Latency Throughput
https://openmessaging.cloud/docs/benchmarks/ https://github1s.com/confluentinc/openmessaging-benchmark/blob/master/driver-kafka/kafka-all.yaml
- Latency: how long it takes between message production and consumption Properties:
producer
- buffer.memory
- batch.size (default 16k)
- linger.ms (default 0) // send immediately
- “acks=0”, “acks=1”, and default: “acks=all”
- max.inflight.requests.per.connection (default 5)
consumer
fetch.min.bytes fetch.max.bytes
broker
min.insync.replicas=
Broker sends data to consumer when either of te following is met: fetch.min.bytes defaults to 1 (optimized for latency, give me data ASAP) fetch.max.wait.ms: default to 500
1
2
3
4
5
# producer wth 10 seconds linger
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic lat --producer-property linger.ms=10000
# consumer will receive messages only 10 secs after
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic lat --from-beginning
tail latency? why it matters?
https://www.confluent.io/blog/kafka-fastest-messaging-system/ :
- though running Kafka with synchronous fsync is extremely uncommon and also unnecessary
- rely on replication rather than fsync
use OS page cache policy instead of fsync after each message flush.message and and flush.ms (nb message and tineout to flush/fsync both default to 9223372036854775807 (infinity basically))
- 3 brokers with 64 GB memory,
Throughput test: we can achieve 600MB/s throughput with 3x replicas * default fsync i,e, no fsync * 10 MB batch size and linger.ms 10
Latency test: 300K messages of 1k i,e, 300 MB with a 10- 20 ms latency * linger.ms 1 fsync off
1
2
3
Rabbitmq would start overloading CPU (latency increases dramatically ~2seconds for 36MB) after 30MB (that's why Kafka wins ..) but for lower than 300MB, Rabbitmq achieves sub 5 ms latency, better than kafka
if fsync is on, kafka latency gets higher and is >60ms for p99.9th
No replication (availibility) => kafka latency ~5ms Rabbitmq can a p99 1ms latency if only as long as msg load is low
====
https://www.confluent.io/blog/configure-kafka-to-minimize-latency/
SSL cannot leverage zero-copy data transfer using sendfile
Min.insync.replicas defines the number of replicas that have to be in sync for the broker to accept writes for the partition. This configuration impacts availability, but it DOES NOT impact end-to-end latency. The message has to be replicated to all replicas that are in sync with the leader regardless of the min.insync.replicas configuration. Thus, choosing a smaller min.insync.replicas does not reduce commit time or latency as a result.
If we care about tail latencies, with the default linger.ms=0
(supposed to be optimal for latency) can result in multiple tiny request that overwhelm the broker and cause bursts. Slightly increasing linger.ms
to 5 or 10, although, increases the average, it cures the bursts at the tail end 99.9th
More clients even for the same produce/consume rate lead to more loadd on the brokers because of metadata requests and more bookkeeping to maintain connections/info also inflight => max.inflight.requests.per.connection
* nb_producers
Too many partitions negative effect on end-to-end latency.
- less batching
- more requests for replica fetch
- more state and bookkeeping
More requests to broker => larger queue -> more time to handle
===== Picking number of partition ===
- More partitions => more throughput
- let’s say consumption throughput in c and produce throughput is p. Desired throughput is t. We need at least max(t/c,t/p)
- More partitions => more time in case broker fails (leader moving is sequential)
- More partitions => more time in case controller fails (new controller needs to fetch all parititon from ZK)
- by default, there’s only a single thread to fetch replica data, more parition=> more latency ====> more partition hurt latency but good for throughput
- Producer and consumer memory consumption is proportional to nb of partitions
==== tune latency===
- num.replica.fetchers defaults to 1 and is nb of thread to replicate from leader. Increase if follower can’t keep up
- compression takes up CPU cycles but reduces bandwidth. Might want to disable for extremely senstive latency
=== producer max.block.ms
=== Default Value: 60000 (60 seconds) The producer put messages in the buffer before sending them (to account for max.inflight, compression, linger.ms, batch.size). The max.block.ms
setting controls how long the producer will wait for space to become available in the buffer buffer.memory
before throwing an exception. This setting is useful to avoid the producer from indefinitely waiting if the broker is slow or unresponsive. ONLY WHEN THE BUFFER is full buffer.memory
=== enable.auto.commit === by default, consumer will commit offsets ( upper bound is auto.commit.interval.ms default to 5 seconds) when poll
is called. If a failure occur and a lot of records have been processed but not committed, duplicate processing will happen.
if processing is done in the main loop (no threads), no offsets can be committed without being processed, but if msgs are passed to another thread for processing, no guarantees
Commit frequency is a trade-off between performance and number of duplicates in the event of a crash
=== at most === at-most-once delivery by disabling retries on the producer and committing offsets in the consumer prior to processing a batch of messages.
=== idempotent producer ===
enable.idempotence
set to true- Producer ID (PID) is assigned to the producer by broker and transparent to users
- each message has a seq id per partition, same message retried with same seq ID, so if there is a retry and the broker sees it’s a low seq ID, broker rejects and producer get a duplicate error that can be ignored
- if the producer has a
transaction.id
, PID is linked to it and thus consistent across restart, so idempotence persists after a restart. If notransaction.id
, the producer is given a new PID and thus previous seq.id and dedup is lost
=== Transaction ===
- produce to multiple TopicPartitions atomically. (or commit consumer offset i.e. consume using
sendOffsetsToTransaction
) - all writes to these TopicPartitions will succeed or fail as a unit.
- this enables “consume-transform-produce” as a single atomic unit
1 2 3 4 5 6 7 8 9 10 11
producer.initTransactions(); try { producer.beginTransaction(); producer.send(record1); producer.send(record2); producer.commitTransaction(); } catch(ProducerFencedException e) { // if another producer with same transaction.id producer.close(); } catch(KafkaException e) { producer.abortTransaction(); }
- consumer:
isolation.level
: read committed or uncommitted transaction.id.
has a generation id and upon restart, it’s incremented and old generations are fenced- when a new generation starts (producer epoch), any old gen transaction are guaranteed to be committed or aborted.
- For consumer, transaciton msgs can straddle across multiple segments which can be compacted or deleted, so no guarantee the consumer sees all old transactions as a whole + consumer may not consumer from all the transaction’s partitions (not see all msgs), this is up to the app.
- at the end of transaction (abort/commit), the transaction coordinator write an ABORT(PID) or COMMIT(PID) marker message to all topicPartition of the transaction. After that, another COMMIT or ABORT msg is written to the transaction log.
- broker has
transaction.max.timeout.ms
(default 15min) which is max time allowed for a transaction - To prevent interference with clients transaciton,
transactionId
can be protected with an ACL (like groups ..) - For read_committed consumer, they can read up to
Last Stable Offset
(LSO) meaning if a transac is in progres, consumer won’t advance. This adds delay. - If a malicious transactinal producer never stops a transaction (or waits till max.timeout), this will block consumers, that’s why ACL are required
==== consumer assignor ===
Range partition: order consumers then for each topic assign partition i to consumer i => not balanced e,g, if less partition than consumer, last consumers won’t get anything
=> GREAT FOR COLOCATING KEYS => key X goes to partition 0 of all topics, => same consumer gets it and joins them
Round robin: order partition regardless of index and go through all consumers => more balanced but no colocation
StickyRoundRobin => try to stick partition to their existing consumer in rebalance (avoid a lot of data movement and state rebuilding)
Cooperative sticky: in other assignor, when a rebalence happens (new consumer or consumer left, or new topic ..), the consumers would release their partitions and won’t resume processing until the new assignment is determined. This is BAD! stop the world! lots of lag. => cooperativeSticky: consumers are part of the rebalance but they don’t release partition until they are explicitly told to do so at the end (if ever, because with stickiness, most will keep their existing partitions)
With static membership, if a consumer restarts and tries to join before
sesison.timeout
, no rebalance will be triggered- The EAGER rebalance protocol requires a consumer to always revoke all its owned partitions before participating in a rebalance event. It therefore allows a complete reshuffling of the assignment. COOPERATIVE rebalance protocol allows a consumer to retain its currently owned partitions before participating in a rebalance event.