Yige

Yige

Build

Hadoopシリーズ-HDFS

Hadoop シリーズ - HDFS#

一、HDFS の起動プロセス#

  1. ファイルのメタ情報を読み込む
  2. ログファイルを読み込む
  3. チェックポイントを設定する
  4. セーフモードに入る。目的は:データブロックのレプリカ率、冗長度が要求を満たしているかを確認すること

二、HDFS の運用メカニズム#

ユーザーファイルはブロックに分割され、複数の DataNode サーバーに保存され、各ファイルはクラスタ全体に複数のレプリカを持つことで、データの安全性を向上させることができる

三、基本アーキテクチャ#

  • NameNode: 全体のファイルシステムの管理ノードで、ファイルがどのようにブロックデータに分割されるかを記録し、これらのデータブロックのストレージ情報を記録する

    1. Fsimage: メタデータがハードディスク上に保存されるイメージファイル
    2. edits: システム操作のログ記録ファイル
    3. Fstime: 最後のチェックポイントの時間を保存
    4. seen_txid: 最後の edits の数字
    5. version
  • Secondary NameNode: 補助バックグラウンドプログラム(NameNode の災害復旧ノードではない)、NameNode と通信し、定期的に HDFS メタデータのスナップショットを保存する

  • DataNode: データノードで、HDFS データブロックをローカルファイルシステムに読み書きする

HDFS が小さなファイルの保存に適していない理由は、各ファイルがメタ情報を生成し、小さなファイルが増えるとメタ情報も増え、NameNode に負担をかけるからである

フェデレーション HDFS#

各 NameNode は 1 つの名前空間を維持し、異なる NameNode 間の名前空間は相互に独立している。各 DataNode は各 NameNode に登録する必要がある

  • 複数の NN が 1 つのクラスタ DN のストレージリソースを共有し、各 NN は独自にサービスを提供できる。

  • 各 NN はストレージプールを定義し、独自の ID を持ち、各 DN はすべてのストレージプールにストレージを提供する。

  • DN はストレージプール ID に従って対応する NN にブロック情報を報告し、同時に DN はすべての NN にローカルストレージの可用リソース状況を報告する。

  • クライアントが複数の NN のリソースに便利にアクセスする必要がある場合、クライアントマウントテーブルを使用して異なるディレクトリを異なる NN にマッピングすることができるが、NN には対応するディレクトリが存在する必要がある

四、NameNode の作業メカニズム#

NameNode 作業メカニズム.png
HDFS の読み書き -> ログをロール -> SN が NN にチェックポイントが必要かどうかを問い合わせる -> 時間が来た(60 分)または edits データが満杯になったらチェックポイントをトリガー -> SN がチェックポイントの実行をリクエスト -> NN が edits ファイルと fsimage ファイルを SN にコピー -> SN がマージされた fsimage を NN に同期する

五、DataNode の作業メカニズム#

DataNode ノード作業メカニズム.jpeg

  1. DataNode が起動して NN に登録し、その後定期的にブロックデータの情報を報告する
  2. NN と DataNode 間はハートビート検出メカニズムを介して、ハートビートは 1 回 / 3 秒で、10 分以上ハートビートが受信されない場合はノードが使用不可と見なされる

六、HDFS のデータ読み取りプロセス#

HDFS 読み取りプロセス.png

Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path file = new Path("demo.txt");
FSDataInputStream inStream = fs.open(file);
String data = inStream.readUTF();
System.out.println(data);
inStream.close();
  1. クライアントがFileSystemオブジェクトを初期化し、open()メソッドを呼び出してDistributedFileSystemオブジェクトを取得する
  2. DistributedFileSystem が RPC を介して NN に最初のバッチのブロックロケーションを取得するようリクエストする
  3. 前の 2 つのステップで FSDataInputStream が生成され、このオブジェクトは DFSInputStream オブジェクトにラップされる
  4. クライアントがread()メソッドを呼び出すと、DFSInputStream はクライアントに最も近い DataNode を見つけて接続し、このファイルの最初のデータブロックの読み取りを開始する。データは DataNode からクライアントに転送される
  5. 最初のデータブロックの読み取りが完了すると、DFSInputStream は接続を閉じ、次のデータブロックの DataNode に接続してデータ転送を続ける
  6. データを読み取る際に DFSInputStream と DataNode の通信に異常が発生した場合、次のデータブロックを含む DataNode に接続を試み、どの DataNode にエラーが発生したかを記録し、残りのブロックを読み取る際にはその DataNode をスキップする
  7. クライアントがデータの読み取りを完了した後、close()メソッドを呼び出して接続を閉じ、データをローカルファイルシステムに書き込む

七、HDFS のデータ書き込みプロセス#

HDFS 書き込みプロセス.png

Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path file = new Path("demo.txt");
FSDataOutputStream outStream = fs.create(file);
out.write("Welcome to HDFS Java API !!!".getBytes("UTF-8"));
outStream.close();
  1. クライアントがcreateメソッドを呼び出し、ファイル出力ストリームDFSDataOutputStreamオブジェクトを作成し、NN にファイルのアップロードをリクエストする
  2. DistributedFileSystem が RPC を介して NN にブロックに関連付けられていない新しいファイルエントリを作成するように呼び出す。作成前に NN はファイルが存在するか、作成する権限があるかを検証し、成功した場合は操作を EditLog(WAL,write ahead log)に書き込み、出力ストリームオブジェクトを返す。そうでなければ IO 例外をスローする
  3. クライアントはまずファイルを分割する。例えば、1 つのブロックが 128M で、ファイルが 300M の場合、3 つのブロックに分割され、2 つが 128M、1 つが 44M となり、その後 NN にどの DataNode サーバーにブロックを送信するかをリクエストする
  4. NN が書き込み可能な DN ノード情報を返し、クライアントと NameNode が割り当てた複数の DataNode がパイプラインを構成し、接続を確立し、クライアント側が出力ストリームオブジェクトにデータを書き込む
  5. FSDataOutputStream オブジェクトを介して DN にデータを書き込み、これらのデータは小さなパケットに分割され、データキューに並べられる。クライアントが最初の DataNode に 1 つのパケットを書き込むと、そのパケットはパイプライン内で次の DataNode、3 番目の DataNode に直接渡される。1 つのブロックまたはファイル全体を書き終えてから次に配布するのではない
  6. 各 DN が 1 つのブロックを書き終えると、クライアントにack確認情報を応答する。注意すべきは、各パケットを書き終えた後に確認情報を返すのではない
  7. クライアントがデータの書き込みを完了した後、closeメソッドを呼び出してストリームを閉じる

補足:

  • クライアントが write 操作を実行した後、書き終えたブロックのみが可視で、書き込み中のブロックはクライアントには見えないsyncメソッドを呼び出すことで、クライアントはそのファイルの書き込み操作がすべて完了したことを確認できる。クライアントが close メソッドを呼び出すと、デフォルトで sync メソッドが呼び出される。手動で呼び出す必要があるかどうかは、プログラムの要件に応じてデータの堅牢性とスループットの間のバランスによる

書き込み中のエラー処理:#

書き込みプロセスでの DN レプリカノードのエラー:#

  1. まずパイプラインが閉じられる
  2. すでにパイプラインに送信されたが確認を受け取っていないパケットをデータキューに書き戻し、データの損失を防ぐ
  3. 次に、正常に動作している DN データノードに新しいバージョン番号が付与される(NameNode のリース情報を利用して最新のタイムスタンプバージョンを取得できる)。こうすることで、故障ノードが復旧した際にバージョン情報が一致しないため、故障した DataNode は削除される
  4. 故障ノードを削除し、正常な DN データノードを選択してパイプラインを再構築し、データの再書き込みを開始する
  5. レプリカが不足している場合、NN は他の DN ノードに新しいレプリカを作成する

書き込みプロセスでのクライアントのクラッシュ解決:#

データの書き込み中にクライアントが異常終了した場合、同一ブロックデータの異なるレプリカが不一致の状態になる可能性がある。特定のレプリカを主データノードとして選択し、他のデータノードを調整し、NN はリースメカニズムを介してすべての DN レプリカノードがこのデータブロック情報を持つ最小ブロック長を見つけ、そのデータブロックを最小長に復元する

詳細は参照: HDFS 復元プロセス 1

八、HDFS レプリケーションメカニズム#

最初のレプリカ:アップロードノードが DN の場合、そのノードにアップロードされる;アップロードノードが NN の場合、DN をランダムに選択する
2 番目のレプリカ:異なるラックの DN に配置される
3 番目のレプリカ:2 番目のレプリカと同じラックの異なる DN に配置される

九、HDFS セーフモード#

セーフモードは HDFS の作業状態の 1 つで、セーフモードにある状態では、クライアントにファイルの読み取り専用ビューのみを提供し、名前空間の変更を受け付けない

  • NN が起動すると、最初に fsimage をメモリに読み込み、その後 edits ログ記録の操作を実行する。メモリ内でファイルシステムメタデータのマッピングが成功裏に確立された後、新しい fsimage と空の edits 編集ログファイルが作成され、この時 NN はセーフモードで動作する
  • この段階で NN は DN から情報を収集し、各ファイルのデータブロックを統計し、最小レプリカ条件が満たされていることを確認すると、一定の割合のデータブロックが最小レプリカ数に達している場合、セーフモードを終了する。満たされない場合、DN に対してレプリカ数が不足しているデータブロックの複製を手配し、最小レプリカ数に達するまで続ける
  • フォーマットされたばかりの HDFS を起動すると、データブロックがないためセーフモードには入らない

セーフモードを終了するには:hdfs namenode -safemode leave

十、HA 高可用メカニズム#

参照: Hadoop NameNode 高可用 (High Availability) 実装解析

基本アーキテクチャの実装#

HDFS の HA 高可用性は zk によって保証され、基本アーキテクチャ:
image.png

  • アクティブNameNodeスタンバイNameNode: 主備 NameNode ノードで、アクティブな主 NameNode ノードのみが外部にサービスを提供する

  • 共有ストレージシステム: NN の運用中に生成されたメタデータを保存し、主備 NN は共有ストレージシステムを介してメタデータの同期を実現し、主備切り替えを行う際には、新しい NN がメタデータが完全に同期されていることを確認してから外部にサービスを提供できる

  • 主備切り替えコントローラー ZKFailoverController: ZKFC は独立したプロセスとして実行され、NN の健康状態を迅速に監視し、主 NN に障害が発生した場合、zk クラスターを利用して主備自動選挙切り替えを実現する

  • DataNode: DataNode は主備 NN にデータブロック情報を同時にアップロードして HDFS のデータブロックと DataNode 間のマッピング関係を同期させる必要がある

  • Zookeeper クラスター:主備切り替えコントローラーに主備選挙のサポートを提供する

主備切り替えの実装#

参照: Hadoop NameNode 高可用 (High Availability) 実装解析

NameNode の主備切り替えは主にZKFailoverControllerHealthMonitor、およびActiveStandbyElectorの 3 つのコンポーネントによって協調して実現される:

  • 主備切り替えコントローラー ZKFailoverController が起動すると、HealthMonitor と ActiveStandbyElector という 2 つの主要な内部コンポーネントが作成され、ZKFailoverController は HealthMonitor と ActiveStandbyElector にそれぞれのコールバックメソッドを登録する。

  • HealthMonitor は主に NameNode の健康状態を検出し、NameNode の状態が変化した場合、ZKFailoverController の対応するメソッドをコールバックして自動的に主備選挙を行う。

  • ActiveStandbyElector は主に自動的な主備選挙を完了し、内部に Zookeeper の処理ロジックをカプセル化し、Zookeeper の主備選挙が完了すると、ZKFailoverController の対応するメソッドをコールバックして NameNode の主備状態を切り替える

image.png
図のように、プロセス分析:

  1. HealthMonitor が初期化を完了すると、内部スレッドが定期的に対応する NameNode の HAServiceProtocol RPC インターフェースのメソッドを呼び出し、NameNode の健康状態を検出する
  2. NN の状態に変化が検出されると、ZKFailoverController の対応するメソッドがコールバックされ、処理が行われる
  3. ZKFailoverController が主備切り替えが必要であることを検出すると、ActiveStandbyElector を使用して処理を行う
  4. ActiveStandbyElector が ZK と対話して自動選挙を完了し、ZKFailoverController の対応するメソッドをコールバックして現在の NN を通知する
  5. ZKFailoverController が対応する NameNode の HAServiceProtocol RPC インターフェースのメソッドを呼び出して、NameNode をアクティブ状態またはスタンバイ状態に切り替える
読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。