Kafka メッセージオフセットメカニズム#
消費者のオフセットの提出#
消費者は、_consumer_offset という特別なトピックにメッセージを送信し、メッセージには各パーティションのオフセットが含まれています。
消費者が常に稼働している場合、オフセットはあまり意味がありません。しかし、消費者がクラッシュしたり、新しい消費者がグループに参加したりすると、再均衡
がトリガーされます。再均衡が完了すると、各消費者は以前処理していたものではなく、新しいパーティションに割り当てられる可能性があります。以前の作業を続けるために、消費者は各パーティションの最後に提出されたオフセットを読み取り、オフセットで指定された場所から処理を続ける必要があります。この時、提出されたオフセットとクライアントが処理しているオフセットとの不一致により、重複消費やメッセージの喪失が発生する可能性があります。
重複消費#
提出されたオフセットがクライアントが処理した最後のメッセージのオフセットよりも小さい場合、2 つのオフセットの間にあるメッセージが重複して処理されます。
メッセージの喪失#
提出されたオフセットがクライアントが処理した最後のメッセージのオフセットよりも大きい場合、両者の間にあるメッセージが失われます。
オフセットの提出方法#
自動提出(at most once)#
enable.auto.commit が true に設定されている場合、消費者はオフセットを自動的に提出し、提出の時間間隔は auto.commit.interval.ms によって制御され、デフォルトは 5 秒です。
最も簡単な提出方法ですが、メッセージ処理の状況を明確に把握することができず、メッセージの重複消費やメッセージの喪失が発生しやすくなります。
現在のオフセットを手動で提出(at least once)#
KafkaConsumer API を使用して、必要なときに現在のオフセットを手動で提出できます。時間間隔に基づくのではなく。
-
commitSync()
同期的に提出し、poll () から返された最新のオフセットを提出します。信頼性を保証できますが、提出時にプログラムがブロック状態になるため、スループットが制限されます。 -
commitAsync()
非同期的に提出し、メッセージの信頼性を保証しません。カスタムコールバック処理をサポートし、提出エラーの記録やメトリクスの生成に使用されます。
特定のオフセットを提出#
オフセットの提出頻度はメッセージバッチの処理頻度と同じです。つまり、通常はバッチメッセージを処理した後にオフセットを 1 回提出しますが、たとえば poll () メソッドが大量のデータを返す場合、再均衡(詳細は次のセクションで説明)による重複処理を避けるために、バッチの途中でオフセットを提出したい場合があります。この場合、commitSync ()/commitAsync () メソッドを呼び出すときに、提出したいパーティションとオフセットを含むマップを渡すことで実現できます。しかし、消費者は 1 つのパーティションだけを読み取るわけではないため、すべてのパーティションのオフセットを追跡する必要があります。このため、このレベルでオフセットの提出を制御すると、コードが複雑になります。
特定のオフセットからレコードの処理を開始#
// トピックtopicのすべての利用可能なパーティションpartitionの情報を取得
partitionInfos = kafkaConsumer.partitionsFor(topic);
Collection<TopicPartition> partitions = null;
if (partitionInfos != null) {
// 消費者は自分にパーティションを割り当てます。ここでは例としてすべてのパーティションを消費することに相当します。実際の状況では特定のパーティションを割り当てるロジックを追加できます。
partitionInfos.forEach(partitionInfo -> {
partitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
});
kafkaConsumer.assign(partitions);
}
ConsumerCommitOffset.commitOffset(kafkaConsumer);
再均衡#
Kafka の消費モード#
Kafka からメッセージを消費する際、Kafka クライアントは 2 つのモードを提供します:パーティション消費、グループ消費。
消費者数とパーティション数の関係#
- 1 つの消費者は 1 つからすべてのパーティションデータを消費できます。
- グループ消費では、同じグループ内のすべての消費者が完全なデータを消費します。この場合、1 つのパーティションデータは 1 人の消費者によってのみ消費され、1 人の消費者は複数のパーティションデータを消費できます。
- 同じ消費グループ内で、消費者数がパーティション数を超えると、消費者には余剰が生じます = パーティション数 - 消費者数。この場合、新しいグループを追加することができます。
グループ消費の再均衡戦略#
私たちは、1 つのグループ内の消費者が共同でトピックのパーティションデータを消費することを知っています。新しい消費者がグループに参加すると、他の消費者が読み取っていたメッセージを読み取ります。消費者が閉じられたりクラッシュしたりすると、その消費者はグループを離れ、元々その消費者が読み取っていたパーティションはグループ内の他の消費者によって読み取られます。また、トピックに変更があり、新しいパーティションが追加された場合も、パーティションの再割り当てが発生します。
これらの状況では、パーティションの所有権が 1 人の消費者から別の消費者に移転されます。これがパーティションの再均衡
であり、消費者グループに高可用性とスケーラビリティをもたらしますが、再均衡中は消費者がメッセージを読み取ることができず、グループ全体が短時間利用できなくなるという欠点があります。
パラメータpartition.assignment.strategy
を設定して、割り当て戦略を選択します。2 つの割り当て戦略があります:
RangeAssignor#
トピックのいくつかの連続したパーティションを消費者に割り当てます。デフォルトはこれです。
消費者 C1 と消費者 C2 が同時にトピック T1 とトピック T2 を購読し、各トピックに 3 つのパーティションがあると仮定します。消費者 C1 は、これら 2 つのトピックのパーティション 0 とパーティション 1 に割り当てられる可能性があり、消費者 C2 はこれら 2 つのトピックのパーティション 2 に割り当てられます。
RoundRobinAssignor#
トピックのすべてのパーティションを消費者に順番に割り当てます。
RoundRobin 戦略を使用して消費者 C1 と消費者 C2 にパーティションを割り当てる場合、消費者 C1 はトピック T1 のパーティション 0 とパーティション 2、およびトピック T2 のパーティション 1 に割り当てられ、消費者 C2 はトピック T1 のパーティション 1 およびトピック T2 のパーティション 0 とパーティション 2 に割り当てられます。一般的に、すべての消費者が同じトピックを購読している場合(この状況は非常に一般的です)、RoundRobin 戦略はすべての消費者に同じ数のパーティションを割り当てます(または最大で 1 つのパーティションの差があるだけです)。
参考#
パーティションハートビートメカニズム#
消費者はハートビートメカニズムに依存して GroupCoordinator に生存を報告し、ハートビートを送信してグループとの従属関係およびパーティションの所有権関係を維持します。
具体的な参考:Kafka ソースコード解析:グループ調整管理メカニズム
再均衡リスナー#
オフセットを提出する際、消費者は退出し、パーティションの再均衡を行う前にいくつかのクリーンアップ作業を行います。リスナーのメソッドを定義することで、バッファレコードの処理やデータベース接続操作による情報の保存(たとえば、Mysql
を利用してオフセットを手動で維持する)などの特別なロジック処理を行うことができます。
実装は非常に簡単で、消費トピックを購読する際に再均衡リスナーのインスタンスを渡すだけです(インターフェースを自分で実装できます)。
consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
// 再均衡が始まる前と消費者がメッセージの読み取りを停止した後に呼び出されます。
// ... 自分で処理ロジックを実装
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
// パーティションの再割り当て後と消費者がメッセージの読み取りを開始する前に呼び出されます。
}
});