Apache Kafka is a distributed streaming platform that provides five main capabilities:
All of them you can find in the guide to the Kafka Protocol.
Components #
Broker #
Management tools:
Important configuration parameters:
Essential configs
| Name | Value | Note | 
|---|---|---|
| broker.id | related to the hostname | Should be a unique value for every broker | 
| zookeeper.connect | host:2181[,host2:2181…]/chroot/path | A comma-separated list of Zookeeper addresses | 
| log.dirs | path to directory | The main Kafka directory. Could also be a comma-separated list | 
| listeners | listener://host:port[PLAINTEXT://:9092] | Also valid: CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093 | 
| background.threads | [10] | The number of threads to use for various background processing tasks | 
| num.io.threads | [8] | The number of threads that the server uses for processing requests (including I/O) | 
| num.network.threads | [3] | The number of threads that the server uses for receiving requests and sending responses | 
| queued.max.requests | [500] | The number of queued requests allowed for data-plane, before blocking the network threads | 
Resiliency
| Name | Value | Note | 
|---|---|---|
| broker.rack | meaningful string | Specify it to perform availability zone aware operations | 
| default.replication.factor | 2 or 3 [1] | The number of copies of the data. 1means NO copies | 
| min.insync.replicas | 1 or 2[1] | Depends on the replication.factor. The producer raises an exception if a majority of replicas do not receive a new write | 
| message.max.bytes | [1Mb] | Depends on the cluster size. Should be higher than fetch.message.max.byteson the consumer | 
| auto.create.topics.enable | false[true] | Should be disabled. Otherwise, even a metadata request (such as checking if the topic exists) will create a new topic | 
| max.partition.fetch.bytes | [1Mb] | The maximum amount of data per-partition the server will return. message.max.bytes(broker config) andmax.message.bytes(topic config) define other limits | 
| delete.topic.enable | [true] | Prevents accidental deletions, if disabled | 
| num.replica.fetchers | [1] | Increasing this value can increase I/O parallelism in the broker | 
| num.recovery.threads.per.data.dir | ~10[1] | The number of threads broker will use on startup after recovery (recovery * log dirs) | 
Rebalancing
| Name | Value | Note | 
|---|---|---|
| unclean.leader.election.enable | [false] | Leads to data loss if it is enabled, and the storage is not reliable | 
| auto.leader.rebalance.enable | false[true] | Keep in disabled to avoid pauses for leader election | 
| replica.lag.time.max.ms | [30000] | Determines when slow follower will be removed from the ISR | 
| leader.imbalance.check.interval.seconds | [300] | |
| leader.imbalance.per.broker.percentage | [10] | The controller would trigger a leader balance if it goes above this value per broker | 
These settings are NOT mutually exclusive:
Rotation
| Name | Value | Note | 
|---|---|---|
| log.segment.bytes | 1GB | May need to increase OS inodesetting when segments are small | 
| log.roll.ms | null[168 hours] | Closes the segment after a given number of ms, even if it’s too small. | 
Retention
| Name | Value | Note | 
|---|---|---|
| log.retention.bytes | [-1] | Size based retention is unlimited by default | 
| log.retention.(hours/minutes/ms) | [168 hours] | The smaller time unit takes precedence. Default is 1 week. | 
Partitioning
num.partitions - should be a multiple of the number of brokers. This value cannot be reduced.
In case of a key-based partitioning - increasing the number is also a no go.
| producer | broker | consumer | 
|---|---|---|
| 100 Mb/sec => | 10 partitions | => 10Mb/sec | 
The empirical rule is to aim for the number of partitions, so the size of each of them will grow approximately 6 Gb/day.
How to choose the number of topics/partitions in a Kafka cluster?
Compaction
compression.type = uncompressed(snappy|lz4|gzip|zstd) - bigger batches show better compression results
Check official documentation at https://kafka.apache.org/documentation/#compaction
Zookeeper
There should be 3 or 5 Zookeeper nodes running to have consensuses.
Group coordinator
The coordinator assigns partitions to consumers and detects consumer availability via heartbeats.
You can access data from the group coordinator through bin/kafka-consumer-groups.sh:
bin/kafka-consumer-groups.sh --describe --group my-group [--members] [--verbose] [--state] --bootstrap-server localhost:9092
bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
Producers #
acks
- acks=0- fire and forget. With this setting- retriesare disabled, and the current offset is always- -1
- acks=1(default) - confirmation only from the leader
- acks=(-1|all)- confirmation from all brokers. Still, it requires- min.insync.replicasto be more than default 1
| Name | Value | Note | 
|---|---|---|
| delivery.timeout.ms | [1200000] | This value should be higher than request.timeout.ms+linger.ms | 
| retries | [2147483647] | If enabled ( > 0) can reorder batches of messages. It’s better to control delivery withdelivery.timeout.msand leave this one by default | 
| request.timeout.ms | [300000] | After this time producer will retry the request | 
| retry.backoff.ms | [100] | The time to wait before retrying publishing | 
| buffer.memory | [33mb] | The buffer size to store pending messages | 
| max.block.ms | [60000] | The time for pending messages to wait in the buffer before discarded with an exception | 
| batch.size | [16384] | Size in bytes. Messages bigger than the batch size producer sends immediately. This value should be lower than max.request.size. Bigger batches show better compression results | 
| linger.ms | 30[0] | The time allowed to collect a batch of messages for publishing. Increasing this number adds latency, but reduces load | 
| max.in.flight.requests. per.connection | 1[5] | Makes producer ordered if the value is set to 1 | 
Consumers #
Basic considerations:
- If multiple consumers read the same topic, they should have the same group.id.
- Max number of consumers in a group is equal to the number of total available partitions in the topic.
- Consumers can subscribe to topics by regex - a powerful but dangerous feature.
- The main method is poll(TIMEOUT)- responsible for both consuming and sending heartbeats.
Configuration parameters: #
Fetching
| Name | Value | Note | 
|---|---|---|
| fetch.min.bytes | [1] | The number of bytes to fetch. Increasing could be useful to reduce the load | 
| fetch.max.wait.ms | [500] | Better to have it set, if fetch.min.bytesis configured to fetch even small chunks, eventually | 
| fetch.max.bytes | [52428800] | This is not an absolute maximum. Also, consumer performs multiple fetches in parallel. | 
| partition.assignment.strategy | `(RoundRobin | Fair[Range])` | 
Timeouts
| Name | Value | Note | 
|---|---|---|
| session.timeout.ms | [10sec] | 3 * heartbeat.interval.ms+ some threshold | 
| heartbeat.interval.ms | [3sec] | The time between consequent heartbeats to the consumer coordinator | 
| max.poll.interval.ms | [5min] | The maximum delay between invocations of poll() | 
| default.api.timeout.ms | [1min] | Specifies the timeout (in milliseconds) for client APIs | 
Committing
| Name | Value | Note | 
|---|---|---|
| enable.auto.commit | false[true] | Better to turn it off, to prevent data loss | 
| auto.commit.interval.ms | [5s] | Ignored if enable.auto.commit = false | 
Consumer should commit offsets only explicitly for each partition. And, before closing the connection, it should commit offsets synchronously.
- consumer.commitSync(offsets) - sends only the last successfully read offsets
- consumer.commitSync() - sends all offsets, including failed ones
Offsets
| Name | Value | Note | 
|---|---|---|
| auto.offset.reset | `([latest] | earliest | 
When there is no known offset on the consumer side, the order of actions goes like this. First, you search for an offset utilizing functions like:
consumer.position()
seekToBeginning(partition)
seekToEnd(partition)
Then you set the offset:
seek(partition, position)
And, only after that, you start the polling loop:
poll()
Registry #
Schema Registry is not part of the Kafka distribution, but it seamlessly integrates with Brokers and provides schema validation for messages in Apache Avro format.
Monitoring #
- Monitoring Kafka in Production
- Monitoring Kafka with Kafka exporter + Prometheus + Grafana
- Server metrics
- Producer metrics
- Consumer metrics
- Logging
Server metrics #
Apart from the must-have metrics for JVM applications, such as GC cycles and OS metrics with resource consumption.
- CPU
- Memory
- Disk space
- Disk IO
- Network
Primarily, you need to pay attention to the number of the opened file descriptors and compare it to the maximum number of available file descriptors.
Under-replicated Partitions #
# JMX MBean
kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions
This is the most important metric to show cluster health. Depending on the symptom, you need to choose the right mitigation strategy.
Symptom: replication factor > UnderReplicatedPartitions > 0
The first thing to check if all replica leaders are preferred ones. If not - you can run kafka-preferred-replica-election
to fix the problem, in case auto.leader.rebalance.enable is turned off, as recommended.
A tool that helps to automate rebalancing and some other operational tasks is called Cruise Control, and it has a nice UI
Symptom: UnderReplicatedPartitions > 0 constantly from many brokers
If the total number of Under-replicated Partitions reported by all brokers equals the number of partitions assigned to one of the brokers - it’s highly likely there is an offline broker. Restart it and bring it back to life.
Symptom: The ISR number is flapping. Fluctuating UnderReplicatedPartitions number and no offline brokers.
Usually, this symptom signals a performance issue.
Performance issues #
| Observation | Conclusion | 
|---|---|
| All UnderReplicatedPartitionsare located to the same instance | This means that other brokers have troubles fetching data over the network from this instance | 
| A single instance reports UnderReplicatedPartitions> 0 | This is the opposite case when one instance cannot communicate to other brokers. Run ./kafka-topics ... --describe --under-replicatedto check this hypothesis | 
| Slow Disk IO | One bad disk can affect the whole cluster because producers wait for an ack from more than one broker. Storage performance is critical to cluster health | 
| CPU starvation | Can be caused by other activity on the server or a mismatching configuration on brokers | 
| Resource starvation | There should be thresholds and alerts for CPU, Disk and Network utilisation | 
| Uneven load distribution | kafka-reassign-partitionstool helps to redistribute the load | 
The following attributes should have similar values for all brokers in the cluster to achieve optimal performance:
- Partition count
- Leaders partition count
- Topics In/Out Bytes/s
- Messages/s
Offline Partitions #
# JMX MBean
kafka.controller:type=KafkaController,name=OfflinePartitionsCount
This is the number of partitions without a leader. They don’t accept any new messages until brokers elect a new leader.
Active Controller Count #
# JMX MBean
kafka.controller:type=KafkaController,name=ActiveControllerCount
It should always be 1.
If > 1 - one broker got stuck and will cause problems to admin tasks. This issue you can fix by restarting a broker to
reelect the new controller.
If = 0 - cluster get stuck in the last controlled state. Could be caused by broken connection with Zookeeper.
After you mitigate the root cause, perform a rolling restart of the cluster.
Topic Metrics #
# JMX MBeans
kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec
kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
These metrics indicate the traffic growth rate.
Partition Count #
# JMX MBeans
kafka.server:type=ReplicaManager,name=PartitionCount
kafka.server:type=ReplicaManager,name=LeaderCount
A metric to track good partition ratio:  LeaderCount/PartitionCount = 1/ReplicationFactor
Request metrics #
# JMX MBean
kafka.network:type=RequestMetrics,name=<METRIC_TYPE>,request=<REQUEST_NAME>
For example, a spike in produce request 99th percentile is a sign of performance issues.
Producer metrics #
# JMX MBeans
kafka.producer:type=producer-metrics,client-id=<CLIENT_ID>
kafka.producer:type=producer-node-metrics,client-id=<CLIENT_ID>,node-id=node-<BROKEID>
kafka.producer:type=producer-topic-metrics,client-id=<CLIENT_ID>,topic=<TOPIC_NAME>
Important attributes:
- record-error-rate- How often produced records are erroring out. It counts only if the message is actually dropped
- request-latency-avg- The baseline for latency alerts
- record-queue-time-avg- The time spent in the send buffer. A good indicator to tweak configs, such as- max.partition.bytesand- linger.ms
- produce-throttle-time-avg- Indicates throttling, if Quotas are enabled
Metrics that show how much data we produce:
- outgoing-byte-rate- The average number of outgoing bytes sent per second
- record-send-rate- The average number of records sent per second
- request-rate- The average number of requests sent per second
Consumer metrics #
# JMX MBeans
kafka.consumer:type=consumer-metrics,client-id=<CLIENT_ID>
kafka.consumer:type=consumer-node-metrics,client-id=<CLIENT_ID>,node-id=node-<BROKER_ID>
kafka.consumer:type=consumer-fetch-manager-metrics,client-id=<CLIENT_ID> [,topic=<TOPIC_NAME>`
Latency
- fetch-latency-avg- The north star metric to tweak- fetch.min.bytesand- fetch.max.wait.ms
- fetch-throttle-time-avg- Indicates throttling, if Quotas are enabled
Incoming data
- bytes-consumed-rate- The average number of bytes consumed per second
- records-consumed-rate- The average number of records consumed per second
Consumer Lag
The difference between the last message produced by a producer, and the last message consumed by a consumer. Use Burrow to aggregate metrics across consumers to one status and track consumer lag.
Consumer group coordinator #
# JMX MBean
kafka.consumer:type=consumer-coordinator-metrics,client-id=<CLIENT_ID>
| Metric | Unit | Description | 
|---|---|---|
| sync-time-avg | Time in milliseconds | If it is too high - decrease the number of partitions to let them sync quicker | 
| sync-rate | The number of group syncs per second | Should be 0 for a stable consumer group | 
| commit-latency-average | How fast offsets are committed | Should be comparable to producers request-latency-avg | 
| assigned-partitions | The number of partitions assigned to a consumer | Shows the imbalance of load across consumer group | 
Logging #
Recommended log levels:
| Component | Log Level | Description | 
|---|---|---|
| kafka.controller | INFO | It contains information about topic creation, broker number changes and partition movement | 
| kafka.server.ClientQuotaManager | INFO | Logs events on throttling producers and consumers | 
| kafka.request.logger | (DEBUG/TRACE) | Only for troubleshooting. It shows every request with latency and response. | 
Thanks to Justin Pihony for his course on Pluralsight.