Yige

Yige

Build

Kafkaの基礎

一、Kafka の基本概念#

  • Producer: メッセージとデータの生産者で、Kafka のトピックにメッセージを発行するプロセス / コード / サービス
  • Consumer: メッセージとデータの消費者で、データ(トピック)を購読し、発行されたメッセージを処理するプロセス / コード / サービス
  • Consumer Group: 論理概念で、同じトピックに対して異なるグループにブロードキャストされる。1 つのグループ内では、1 つのコンシューマーだけがそのメッセージを消費できる
  • Broker: 物理概念で、Kafka クラスター内の各 Kafka ノード
  • Topic: 論理概念で、Kafka メッセージのカテゴリで、データを区別し、隔離する
  • Partition: 物理概念で、Kafka におけるデータストレージの基本単位。1 つのトピックデータは複数のパーティションに分散して保存され、各パーティションは順序付けられている
  • Replication: 同じパーティションに複数のレプリケーションが存在する可能性があり、複数のレプリケーション間のデータは同じである
  • Replication Leader: 1 つのパーティションの複数のレプリカの中で、プロデューサーとコンシューマーとのインタラクションを担当するリーダーが必要
  • ReplicaManager: 現在のブローカーのすべてのパーティションとレプリカの情報を管理し、KafkaController からのリクエストを処理し、レプリカの状態の切り替え、メッセージの追加 / 読み取りなどを行う

二、Kafka の特徴#

分散型#

  • 多パーティション
  • 多レプリカ
  • 多購読者
  • ZooKeeper に基づくスケジューリング

高性能#

  • 高スループット
  • 低遅延
  • 高同時実行性
  • 時間計算量は O (1)

永続性と拡張性#

  • データの永続化
  • フォールトトレランス
  • 水平オンライン拡張のサポート
  • メッセージの自動バランス

Kafka が速い理由#

  • 多パーティションによる同時実行性の向上
  • ゼロコピー
  • 順次書き込み
  • メッセージのバッチ操作
  • ページキャッシュ

三、Kafka の基本コマンド使用#

  • トピックの作成(4 つのパーティション、2 つのレプリカ)
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic test
  • クラスターの説明を照会
bin/kafka-topics.sh --describe --zookeeper hadoop2:2181
  • トピックリストの照会
bin/kafka-topics.sh --zookeeper hadoop2:2181 --list
  • 新しいコンシューマーリストの照会
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server hadoop2:6667 --list
  • 特定の消費グループの消費詳細を表示(オフセットが ZooKeeper に保存されている場合のみサポート)
bin/kafka-consumer-groups.sh --describe --bootstrap-server hadoop2:6667 --group test
  • プロデューサーがメッセージを送信
bin/kafka-console-producer.sh --broker-list hadoop2:6667 --topic kafka-test1
  • コンシューマーがメッセージを消費
bin/kafka-console-consumer.sh --bootstrap-server hadoop2:6667 --topic kafka-test1 --from-beginning

四、Kafka の一般的な設定#

プロデューサーの設定#

参考リンク: kafka プロデューサー Producer パラメータ設定およびパラメータ調整の提案

  1. acks
    メッセージが成功裏に書き込まれたと見なすために、いくつのパーティションレプリカがメッセージを受け取る必要があるかを指定する。このパラメータはメッセージの喪失の可能性に重要な影響を与える

    • acks=0
      プロデュースリクエストが即座に返され、リーダーからの確認を待つ必要がない。この方法は最高のスループットを持つが、メッセージが本当に成功裏に送信されたかどうかは保証されない
    • acks=1
      クラスターのリーダーノードがメッセージを受け取ると、プロデューサーはサーバーからの成功応答を受け取る。メッセージがリーダーノードに到達できない場合(例えば、リーダーノードがクラッシュした場合、新しいリーダーがまだ選出されていない場合)、プロデューサーはエラー応答を受け取る。データの喪失を避けるために、プロデューサーはメッセージを再送信する。しかし、メッセージを受け取っていないノードが新しいリーダーになると、メッセージは依然として失われる。この時のスループットは、Kafka プロデューサーがデータを Kafka に書き込む際に同期送信か非同期送信かによって異なる。送信クライアントがサーバーの応答を待つ場合(Future オブジェクトの get () メソッドを呼び出すことによって)、明らかに遅延が増加する(ネットワーク上での往復遅延)。クライアントがコールバックを使用する場合、遅延の問題は緩和されるが、スループットは送信中のメッセージの数によって制限される(例えば、プロデューサーがサーバーの応答を受け取る前に何個のメッセージを送信できるか)。
    • acks=all/-1
      パーティションリーダーは、メッセージがすべての ISR レプリカ(同期レプリカ)に成功裏に書き込まれるのを待たなければ、プロデュースリクエストが成功したとは見なさない。この方法は最高のメッセージ永続性保証を提供するが、理論的にはスループットも最も悪い
  2. buffer.memory
    プロデューサーのメモリバッファのサイズを設定し、プロデューサーはこれを使用してサーバーに送信するメッセージをバッファリングする

  3. compression.type
    メッセージ圧縮アルゴリズムを設定し、snappy、gzip、または lz4 に設定できる

  4. retries
    プロデューサーがメッセージを再送信できる回数

  5. batch.size
    同じパーティションに送信する必要がある複数のメッセージがある場合、プロデューサーはそれらを同じバッチにまとめて送信する。このパラメータは、バッチが使用できるメモリサイズを指定し、バイト数で計算される(メッセージの数ではなく)。ただし、バッチが満杯でない場合でも、linger.msで設定された上限に達するとバッチが送信される

  6. linger.ms
    プロデューサーがバッチを送信する前の待機時間を指定し、batch.size設定と組み合わせて使用する

  7. client.id
    メッセージの発信元を識別するためのフラグ

  8. max.in.flight.requests.per.connection
    プロデューサーがサーバーの応答を受け取る前に送信できるメッセージの数を指定する。値が高いほど、より多くのメモリを占有するが、スループットも向上する。これを 1 に設定すると、メッセージが送信された順序でサーバーに書き込まれることが保証され、再試行が発生しても順序が保たれる

  9. タイムアウト設定

    • request.timeout.ms: プロデューサーがデータを送信する際にサーバーからの応答を待つ時間
    • metadata.fetch.timeout.ms: プロデューサーがメタデータ(例えば、ターゲットパーティションのリーダーは誰か)を取得する際にサーバーからの応答を待つ時間
    • timeout.ms: ブローカーが同期レプリカからメッセージ確認を待つ時間
  10. max.block.ms
    このパラメータは、send () メソッドを呼び出すか、partitionsFor () メソッドを使用してメタデータを取得する際のプロデューサーのブロック時間を指定し、上限に達するとタイムアウト例外がスローされる

  11. max.request.size
    プロデューサーが送信するリクエストのサイズを制御するために使用される

  12. receive.buffer.bytessend.buffer.bytes
    これらの 2 つのパラメータは、それぞれTCPソケットの受信および送信データパケットのバッファサイズを指定する

コンシューマー設定#

  1. fetch.min.bytes
    この属性は、コンシューマーがサーバーから取得するレコードの最小バイト数を指定し、プロデューサーのbuffer.memory設定に類似している。ブローカーがコンシューマーのデータリクエストを受け取ると、利用可能なデータ量がfetch.min.bytesで指定されたサイズ未満の場合、十分な利用可能データがあるまで返さない

  2. fetch.max.wait.ms
    fetch.min.bytesを使用して Kafka に十分なデータがあるまで返さないように指示する。一方、fetch.max.wait.msはブローカーの待機時間を指定し、デフォルトは 500ms で、プロデューサーのlinger.ms設定に類似している

  3. max.partition.fetch.bytes
    サーバーが各パーティションからコンシューマーに返す最大バイト数を指定する

  4. session.timeout.msheartbeat.interval.ms
    heartbeat.interval.msは poll () メソッドがコーディネーターにハートビートを送信する頻度を指定し、session.timeout.msはコンシューマーがどれくらいの時間ハートビートを送信しないことができるかを指定する。heartbeat.interval.mssession.timeout.msより小さくなければならず、一般的にはsession.timeout.msの 3 分の 1 である

  5. auto.offset.reset
    コンシューマーがオフセットのないパーティションを読み取るか、オフセットが無効な場合(コンシューマーが長時間無効になり、オフセットを含むレコードが古くなり削除された)にどう処理するかを指定する(latest/earliest/none/ その他)

  6. enable.auto.commit
    コンシューマーがオフセットを自動的にコミットするかどうかを指定し、デフォルト値は true で、手動でオフセットを管理する場合は false に設定する必要がある

  7. partition.assignment.strategy
    パーティション割り当て戦略

  8. client.id
    ブローカーがクライアントから送信されたメッセージを識別するために使用する

  9. max.poll.records
    この属性は、poll () メソッドの単一呼び出しで返されるレコードの数を制御するために使用され、ポーリングで処理するデータ量を制御するのに役立つ

  10. receive.buffer.bytessend.buffer.bytes

五、Kafka メッセージトランザクション#

メッセージ配信セマンティクス#

  • At-most-once(最大 1 回): メッセージが失われる可能性がある
  • At-least-once(最小 1 回): メッセージが重複する可能性がある
  • Exactly-once(正確に 1 回): 理想的なセマンティクスの実現だが、実装が難しい

災害シナリオ分析#

ブローカーの失敗
Kafka には独自のバックアップメカニズムがあり、メッセージがリーダーレプリカに成功裏に書き込まれた後、n 部冗長化され、他のレプリカに同期されるため、理論的には n-1 のブローカーノードのダウンを許容できる

プロデューサーからブローカーへの RPC 失敗
Kafka のメッセージの信頼性は、プロデューサーがブローカーからの ack 確認情報を受け取ることに基づいているが、ブローカーがメッセージを書き込んだ後、ack 確認情報を返す前に故障が発生する可能性がある。また、メッセージがトピックに書き込まれる前にダウンする可能性もある。プロデューサー側は失敗の原因を知ることができず、メッセージを再送信しようとするため、いくつかの下流シナリオではメッセージの順序が乱れたり、コンシューマーが重複して消費する可能性がある

クライアントの失敗
クライアントも永続的なダウンや一時的なハートビートの喪失が発生する可能性がある。正確性を追求する場合、ブローカーとコンシューマーはゾンビプロデューサーから送信されたメッセージを破棄する必要がある。同時に、新しいクライアントインスタンスが起動した場合、失敗したインスタンスの任意の状態から復元し、安全なポイント(safe checkpoint)から処理を開始する必要がある。これには、消費するオフセット位置と実際に生成されたメッセージ出力を同期させる必要がある

Kafka の Exactly-once セマンティクス保証#

冪等性:パーティション内部の exactly-once セマンティクス保証#

冪等性とは、同じ操作を複数回実行しても結果が一貫していることを指す。Producerのsend()操作は現在冪等である。プロデューサーが重試行を引き起こす場合、同じメッセージがプロデューサーによって複数回送信されても、1 回だけ書き込まれる。ブローカーの設定を変更する必要がある: enable.idempotence = trueでこの機能を有効にする

原理は、TCP における信頼性のある伝送の累積確認メカニズムの実現に似ている:
Producer内の各 RecordBatch には単調増加する seq がある;Broker上の各トピックもpid-seqのマッピングを維持し、各 Commit でlastSeqを更新する。これにより、recordBatch が到着したとき、ブローカーはまず RecordBatch をチェックしてからデータを保存する:バッチ内のbaseSeq(最初のメッセージのseq)がブローカーが維持するシーケンス番号(lastSeq)よりも大きい場合、データを保存し、そうでなければ破棄する:

  • 大きい場合、間にデータが書き込まれていないことを示し、この場合ブローカーはこのメッセージを拒否し、プロデューサーはInvalidSequenceNumberをスローする。これにより、前のメッセージの送信が失敗し、次のメッセージが成功し、前のメッセージが再試行された後に成功することで発生するデータの順序の乱れの問題が解決される
  • 小さいか等しい場合、ブローカーが維持するシーケンス番号を示し、重複メッセージであるため、ブローカーはこのメッセージを直接破棄し、プロデューサーはDuplicateSequenceNumberをスローする

トランザクション保証:複数パーティションにまたがる原子操作#

Kafka はトランザクション API を提供し、メッセージを複数パーティションにまたがって原子性の書き込み操作を行う:

producer.initTransactions();
try {
  producer.beginTransaction();
  producer.send(record1);
  producer.send(record2);
  producer.commitTransaction();
} catch(ProducerFencedException e) {
  producer.close();
} catch(KafkaException e) {
  producer.abortTransaction();
}

Transaction IDepochを導入し、Transaction ID と PID は一対一で対応する可能性があるが、Transaction ID はユーザーが提供し、PID は Kafka 内部の実装であり、ユーザーには透明である。これにより、以下が保証される:

  • セッションをまたぐデータの冪等送信: 同じ Transaction ID を持つ新しいプロデューサーインスタンスが作成されて動作している場合、epochは単調増加し、古いプロデューサーの epoch が新しいプロデューサーの epoch より小さいため、古いプロデューサーで同じ Transaction ID を持つものはもはや動作しない
  • セッションをまたぐトランザクションの回復: あるアプリケーションインスタンスがダウンした場合、新しいインスタンスは未完了の古いトランザクションがコミットまたは中止されることを保証し、新しいインスタンスが正常な状態から作業を開始できる

コンシューマーの観点から見ると、トランザクション保証は相対的に弱い。特に、あるトランザクションでコミットされたすべてのメッセージが一緒に消費されることを保証できない。なぜなら:

  • 圧縮されたトピックの場合、同じトランザクションの一部のメッセージが他のバージョンに上書きされる可能性がある
  • トランザクションに含まれるメッセージが複数のセグメントに分散される可能性がある(同じパーティション内でも)、古いセグメントが削除されると、そのトランザクションの一部のデータが失われる可能性がある
  • コンシューマーはトランザクション内で任意のオフセットのメッセージにアクセスするために seek メソッドを使用する可能性があり、そのため一部のメッセージが失われる可能性がある
  • コンシューマーはトランザクション内のすべてのパーティションを消費する必要がないため、そのトランザクションを構成するすべてのメッセージを読み取ることは決してない

開発実装の考え方#

オフセット管理をカスタマイズし、メッセージの処理とオフセットの偏移量を同じトランザクション内で保証する必要がある。例えば、メッセージ処理のデータを MySQL に書き込むと同時に、現在のメッセージオフセットを更新する

実装:

  1. enable.auto.commit = falseを設定し、メッセージオフセットの自動コミットを無効にする
  2. メッセージを処理する際にオフセットを保存する(例えば、MySQL に保存する)
  3. パーティションが変更されると、パーティションの再バランスが発生する可能性があるため、カスタムクラスを実装してConsumerRebalanceListenerインターフェースを実装し、イベントの変化をキャッチし、オフセットを処理する
  4. パーティションの再割り当てが完了した後、コンシューマーがメッセージを読み取る前に、seek(TopicPartition, long)メソッドを呼び出して、指定されたパーティションのオフセット位置に移動する

参考リンク#

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。