Yige

Yige

Build

Kafka Message Offset Mechanism

Kafka Message Offset Mechanism#

Consumer Offset Submission#

Consumers send messages to a special topic called _consumer_offset, which contains the offset for each partition. If the consumer remains running, the offset is not particularly useful. However, if the consumer crashes or a new consumer joins the group, it triggers a rebalance. After the rebalance is completed, each consumer may be assigned new partitions, rather than the ones they previously processed. To continue previous work, consumers need to read the last committed offset for each partition and then continue processing from the specified offset. At this point, there may be cases of duplicate consumption and message loss due to inconsistencies between the committed offset and the offset processed by the client.

Duplicate Consumption#

If the committed offset is less than the offset of the last message processed by the client, then messages between the two offsets will be processed again.
-w686

Message Loss#

If the committed offset is greater than the offset of the last message processed by the client, then messages between the two will be lost.
-w691

Offset Submission Methods#

Automatic Submission (at most once)#

When enable.auto.commit is set to true, the consumer will automatically commit offsets, with the submission interval controlled by auto.commit.interval.ms, defaulting to 5 seconds. This is the simplest submission method, but it does not clearly indicate the message processing status, making it prone to duplicate consumption and message loss.

Manual Submission of Current Offset (at least once)#

Using the KafkaConsumer API, the current offset can be manually committed when necessary, rather than based on a time interval.

  • commitSync()
    Synchronous submission, which will commit the latest offset returned by poll(), ensuring reliability, but since the program will be in a blocking state during submission, it limits throughput.

  • commitAsync()
    Asynchronous submission, which does not guarantee message reliability, supports custom callback handling for logging submission errors or generating metrics.

Submitting Specific Offsets#

The frequency of offset submission is the same as the frequency of processing message batches, meaning that typically, offsets are submitted after processing a batch of messages. However, sometimes, for example, when the poll() method returns a large batch of data, to avoid duplicate processing of the entire batch due to rebalance (see the next section for details), we may want to submit offsets in between batches. This can be achieved by passing a map containing the desired partitions and offsets to commitSync()/commitAsync() methods. However, since the consumer may read from multiple partitions, tracking offsets for all partitions can complicate the code.

Starting Record Processing from a Specific Offset#

// Obtain information about all available partitions for the topic
partitionInfos = kafkaConsumer.partitionsFor(topic);
Collection<TopicPartition> partitions = null;

if (partitionInfos != null) {

    // The consumer assigns partitions to itself; this example iterates over all partitions, but logic can be added to assign specific partitions
    partitionInfos.forEach(partitionInfo -> {
        partitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
    });

    kafkaConsumer.assign(partitions);
}

ConsumerCommitOffset.commitOffset(kafkaConsumer);

Rebalance#

Kafka Consumption Modes#

Kafka provides two modes for consuming messages: partition consumption and group consumption.

Relationship Between Number of Consumers and Number of Partitions#

  • One consumer can consume data from one to all partitions.
  • In group consumption, all consumers within the same group consume a complete set of data, meaning that data from one partition can only be consumed by one consumer, while one consumer can consume data from multiple partitions.
  • When the number of consumers in the same consumer group exceeds the number of partitions, there will be idle consumers = number of partitions - number of consumers, at which point a new group can be created.

Rebalance Strategy for Group Consumption#

We know that consumers within a group jointly consume partition data of the topic. When a new consumer joins the group, it reads messages that were originally read by other consumers. When a consumer is shut down or crashes, it leaves the group, and the partitions it was reading will be read by other consumers in the group. Additionally, if the topic has changes, such as new partitions being added, partition reassignment will occur.

In these situations, the ownership of partitions is transferred from one consumer to another, which is called partition rebalance. It provides high availability and scalability for consumer groups, but the downside is that during the rebalance, consumers cannot read messages, causing a brief period of unavailability for the entire group.

By setting the parameter partition.assignment.strategy, you can choose the assignment strategy, which has two types:

RangeAssignor#

Assigns several contiguous partitions of the topic to consumers; this is the default.
Assuming consumers C1 and C2 subscribe to topics T1 and T2, and each topic has 3 partitions, then consumer C1 may be assigned partitions 0 and 1 of both topics, while consumer C2 is assigned partition 2 of both topics.

RoundRobinAssignor#

Assigns all partitions of the topic to consumers one by one.
If the RoundRobin strategy is used to assign partitions to consumers C1 and C2, then consumer C1 will receive partitions 0 and 2 of topic T1 and partition 1 of topic T2, while consumer C2 will be assigned partition 1 of topic T1 and partitions 0 and 2 of topic T2. Generally, if all consumers subscribe to the same topics (which is common), the RoundRobin strategy will assign an equal number of partitions to all consumers (or at most, one partition difference).

References#

Partition Heartbeat Mechanism#

Consumers rely on a heartbeat mechanism to report their status to the GroupCoordinator, sending heartbeats to maintain their membership in the group and their ownership of partitions. For more details, refer to: Kafka Source Code Analysis: Group Coordination Management Mechanism

Rebalance Listener#

Before submitting offsets, and before consumers exit and perform partition rebalancing, some cleanup work can be done. We can define a listener method to handle special logic, such as processing buffered records or performing database connection operations to save information (for example, using Mysql to store offsets for manual maintenance).

The implementation is straightforward; just pass an instance of a rebalance listener when subscribing to the consumption topic (you can implement the interface yourself).

consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
    // Called before rebalance starts and after the consumer stops reading messages
      // ... implement your own processing logic
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
    // Called after partitions are reassigned and before the consumer starts reading messages
    }
});
Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.