I. Basic Concepts of Kafka#
Producer
: The producer of messages and data, a process/code/service that publishes messages to a Kafka Topic.Consumer
: The consumer of messages and data, a process/code/service that subscribes to data (topic) and processes the published messages.Consumer Group
: A logical concept; for the same topic, messages are broadcast to different groups, and within a group, only one consumer can consume the message.Broker
: A physical concept; each Kafka node in the Kafka cluster.Topic
: A logical concept; the category of Kafka messages, used to differentiate and isolate data.Partition
: A physical concept; the basic unit of data storage in Kafka. Data for a topic is distributed across multiple partitions, and each partition is ordered.Replication
: A single partition may have multiple replicas, and the data among these replicas is the same.Replication Leader
: Among the multiple replicas of a partition, one leader is responsible for interacting with producers and consumers.ReplicaManager
: Responsible for managing the information of all partitions and replicas on the current broker, handling requests initiated by KafkaController, switching replica states, adding/reading messages, etc.
II. Features of Kafka#
Distributed#
- Multiple partitions
- Multiple replicas
- Multiple subscribers
- Scheduled based on ZooKeeper
High Performance#
- High throughput
- Low latency
- High concurrency
- Time complexity of O(1)
Durability and Scalability#
- Data can be persisted
- Fault tolerance
- Supports horizontal online scaling
- Automatic message balancing
Reasons for Kafka's Speed#
- Multiple partitions enhance concurrency
- Zero-copy
- Sequential writes
- Batch operations for messages
- Page cache
III. Basic Command Usage of Kafka#
- Create a topic (4 partitions, 2 replicas)
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic test
- Query cluster description
bin/kafka-topics.sh --describe --zookeeper hadoop2:2181
- Query topic list
bin/kafka-topics.sh --zookeeper hadoop2:2181 --list
- Query new consumer list
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server hadoop2:6667 --list
- Display consumption details of a specific consumer group (only supports offset stored in ZooKeeper)
bin/kafka-consumer-groups.sh --describe --bootstrap-server hadoop2:6667 --group test
- Producer sends messages
bin/kafka-console-producer.sh --broker-list hadoop2:6667 --topic kafka-test1
- Consumer consumes messages
bin/kafka-console-consumer.sh --bootstrap-server hadoop2:6667 --topic kafka-test1 --from-beginning
IV. Common Kafka Configurations#
Producer Configuration#
Reference link: Kafka Producer Parameter Settings and Tuning Suggestions
-
acks
Specifies how many partition replicas must receive the message before the producer considers the write successful; this parameter significantly affects the possibility of message loss.- acks=0
Indicates that the produce request returns immediately without waiting for any acknowledgment from the leader. This option has the highest throughput but does not guarantee whether the message was successfully sent. - acks=1
As long as the cluster's leader node receives the message, the producer will receive a success response from the server. If the message cannot reach the leader node (e.g., the leader crashes, and a new leader has not yet been elected), the producer will receive an error response. To avoid data loss, the producer will resend the message. However, if a node that did not receive the message becomes the new leader, the message will still be lost. The throughput at this time depends on whether the Kafka producer is sending data synchronously or asynchronously. If the sending client waits for the server's response (by calling the get() method of the Future object), it will obviously increase latency (the round-trip delay in network transmission). If the client uses callbacks, the latency issue can be alleviated, but throughput will still be limited by the number of messages being sent (e.g., how many messages the producer can send before receiving the server's response). - acks=all/-1
The partition leader must wait for the message to be successfully written to all ISR replicas (synchronous replicas) before considering the produce request successful. This option provides the highest message durability guarantee, but theoretically, the throughput is also the worst.
- acks=0
-
buffer.memory
Sets the size of the producer's memory buffer, which the producer uses to buffer messages to be sent to the server. -
compression.type
Sets the message compression algorithm, which can be set to snappy, gzip, or lz4. -
retries
The number of times the producer can resend messages. -
batch.size
When multiple messages need to be sent to the same partition, the producer will group them into the same batch for sending. This parameter specifies the memory size that can be used for a batch, calculated in bytes (not by the number of messages). However, if the batch is not full, it will still be sent out if it reaches the upper limit set bylinger.ms
. -
linger.ms
Specifies the wait time for the producer before sending the batch, in conjunction with thebatch.size
setting. -
client.id
An identifier for the source of the message. -
max.in.flight.requests.per.connection
Specifies how many messages the producer can send before receiving a response from the server. The higher its value, the more memory it will occupy, but it will also increase throughput. Setting it to 1 ensures that messages are written to the server in the order they were sent, even if retries occur. -
Timeout Configuration
request.timeout.ms
: The time the producer waits for a response from the server when sending data.metadata.fetch.timeout.ms
: The time the producer waits for a response from the server when fetching metadata (e.g., who the leader of the target partition is).timeout.ms
: The time the broker waits for synchronous replicas to return message acknowledgments.
-
max.block.ms
This parameter specifies the blocking time of the producer when calling the send() method or using the partitionsFor() method to obtain metadata. Once the limit is reached, a timeout exception will be thrown. -
max.request.size
Used to control the size of requests sent by the producer. -
receive.buffer.bytes
andsend.buffer.bytes
These two parameters specify the buffer sizes for receiving and sending data packets for theTCP socket
.
Consumer Configuration#
-
fetch.min.bytes
This property specifies the minimum number of bytes the consumer must fetch from the server, analogous to the producer'sbuffer.memory
configuration. When the broker receives a data request from the consumer, if the available data is less than the size specified byfetch.min.bytes
, it will wait until there is enough available data before returning it to the consumer. -
fetch.max.wait.ms
By usingfetch.min.bytes
, we tell Kafka to wait until there is enough data before returning it to the consumer.fetch.max.wait.ms
specifies the broker's wait time, which defaults to 500ms, analogous to the producer'slinger.ms
configuration. -
max.partition.fetch.bytes
Specifies the maximum number of bytes the server returns to the consumer from each partition. -
session.timeout.ms
andheartbeat.interval.ms
heartbeat.interval.ms
specifies the frequency at which the poll() method sends heartbeats to the coordinator, whilesession.timeout.ms
specifies how long the consumer can go without sending a heartbeat.heartbeat.interval.ms
must be less thansession.timeout.ms
, generally one-third ofsession.timeout.ms
. -
auto.offset.reset
Specifies how the consumer should handle reading a partition with no offset or an invalid offset (due to the consumer being inactive for a long time, causing the offset record to become outdated and deleted) (latest/earliest/none/anything else). -
enable.auto.commit
Specifies whether the consumer automatically commits offsets; the default value is true. When manually maintaining offsets, it needs to be set to false. -
partition.assignment.strategy
The partition assignment strategy. -
client.id
Used by the broker to identify messages sent from the client. -
max.poll.records
This property controls the number of records that can be returned in a single call to the poll() method, helping you control the amount of data to be processed in polling. -
receive.buffer.bytes
andsend.buffer.bytes
V. Kafka Message Transactions#
Message Delivery Semantics#
- At-most-once: There may be cases of message loss.
- At-least-once: There may be cases of message duplication.
- Exactly-once: An ideal semantic implementation, but difficult to achieve.
Disaster Scenario Analysis#
Broker Failure
Kafka has its own backup mechanism, ensuring that after messages are successfully written to the leader replica, they are redundantly stored n times and synchronized to other replicas, theoretically allowing for n-1 broker nodes to crash.
RPC Failure from Producer to Broker
The reliability of Kafka messages is based on the producer receiving acknowledgment confirmation from the broker. However, the broker may fail after the message has been written but before returning the acknowledgment. It may also crash before the message is written to the topic. Since the producer cannot know the reason for the failure, it can only attempt to resend the message, leading to potential message disorder or consumer duplicate consumption in some downstream scenarios.
Client Failure
Clients may also experience permanent crashes or temporary heartbeat losses. To ensure correctness, brokers and consumers should discard messages sent from zombie producers. Additionally, when a new client instance starts, it needs to recover from any state of the failed instance and begin processing from a safe checkpoint, requiring synchronization between the consumed offset position and the actual produced message output.
Kafka's Exactly-once Semantic Guarantee#
Idempotence: Exactly-once semantic guarantee within partitions#
Idempotence means that performing the same operation multiple times yields consistent results. The send()
operation of the producer is now idempotent. In any case that causes the producer to retry, if the same message is sent multiple times by the producer, it will only be written once. To enable this feature, the broker's configuration must be modified: enable.idempotence = true
.
Principle: Similar to the cumulative acknowledgment mechanism in TCP reliable transmission:
Each RecordBatch in the Producer
has a monotonically increasing seq; each topic on the Broker
also maintains a pid-seq
mapping, and each commit updates lastSeq
. Thus, when a recordBatch arrives, the broker first checks the RecordBatch before saving the data: if the batch's baseSeq (the seq of the first message)
is greater than the broker's maintained sequence (lastSeq)
by 1, it saves the data; otherwise, it discards it:
- If it is greater than 1, it indicates that there is data that has not yet been written in between, at which point the broker rejects this message, and the producer throws
InvalidSequenceNumber
. This resolves the issue of data disorder caused by the previous message failing to send, the next message succeeding, and the previous message retrying successfully. - If it is less than or equal to the broker's maintained sequence, it indicates a duplicate message, and the broker discards this message directly, with the producer throwing
DuplicateSequenceNumber
.
Transactional Guarantee: Atomic operations across partitions#
Kafka provides a transaction API for atomic write operations across partitions:
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch(ProducerFencedException e) {
producer.close();
} catch(KafkaException e) {
producer.abortTransaction();
}
Introducing Transaction ID
and epoch
, where the Transaction ID and PID may correspond one-to-one, with the distinction that the Transaction ID is provided by the user, while the PID is internally implemented by Kafka and is transparent to the user. This guarantees:
-
Idempotent sending of data across sessions
: When a new producer instance with the same Transaction ID is created and works, theepoch
will monotonically increase. Since the old producer's epoch is smaller than the new producer's epoch, the old producer with the same Transaction ID will no longer work. -
Transactional recovery across sessions
: If an application instance crashes, a new instance can ensure that any unfinished old transactions are either committed or aborted, allowing the new instance to start working from a normal state.
From the consumer's perspective, the transactional guarantee is relatively weaker. In particular, it cannot guarantee that all messages committed by a transaction are consumed together because:
- For compressed topics, some messages in the same transaction may be overwritten by other versions.
- Messages included in a transaction may be distributed across multiple segments (even within the same partition), and when old segments are deleted, some data from that transaction may be lost.
- Consumers may access messages at any offset within a transaction using the seek method, potentially losing some messages.
- Consumers may not need to consume all partitions within a transaction, so they may never read all messages that make up that transaction.
Development Implementation Ideas#
Custom offset management is needed to ensure that message processing and offset updates occur within the same transaction, for example, updating the message offset while writing the processed data to MySQL.
Implementation:
- Set
enable.auto.commit = false
to disable automatic offset commits. - While processing messages, save the offset, for example, to MySQL.
- When partition changes occur, which may trigger partition rebalancing, implement a custom class that implements the
ConsumerRebalanceListener
interface to capture event changes and handle offsets. - After reassigning partitions, before the consumer starts reading messages, call the
seek(TopicPartition, long)
method to move to the specified offset position in the partition.
Reference Links#
- Three Modes of Kafka Client Message Reception
- How Kafka 0.11.0.0 Achieves Exactly-once Semantics
- Analysis of Kafka Transaction Characteristics
- Detailed Explanation of Kafka
- [Kafka Design Analysis (Part 8) - Principles of Exactly Once Semantics and Transaction Mechanism]