Yige

Yige

Build

調度システム - Airflow

スケジューリングシステム - Airflow#

スケジューリングシステムの選定#

現在のオープンソーススケジューリングシステムは 2 つのカテゴリに分かれます:

  • 定時型スケジューリングシステムは、設計の核心が定期的な実行、データの分割、弾力的なスケーリングにありますが、依存関係のサポートがあまり親切ではなく、バックエンドビジネス開発により適しています。代表的なものは XXL-JOBElastic-Job です。

  • DAG を中心としたワークフロースケジューリングシステムは、タスクの依存関係を強調しており、一般的なものには以下があります:

    1. Oozie:Oozie は XML 形式で開発され、後に Hue に統合されて視覚的に設定できますが、欠点も明らかで、バージョン管理やログ収集があまり親切ではなく、開発の柔軟性が非常に低く、スケジュール可能なタスクも少なく、定義が非常に複雑で、メンテナンスコストが高いです。もちろん、最も重要なのは共用変数や共用接続情報の概念がないことです。
    2. Azkaban:Oozie とほぼ同様で、最も核心的な問題は共用変数や共用接続情報の概念がないことです。
    3. Airflow
    4. Dolphinscheduler:最近オープンソース化された Apache の孵化プロジェクトで、中国人が開発し貢献しています。

Airflow 基本使用#

基本概念#

  • DAG :有向非循環グラフを意味し、Airflow では全体の作業を定義します。同じ DAG 内のすべてのタスクは同じスケジュール時間を持ちます。

  • Task:DAG 内の具体的な作業タスクであり、特定の DAG 内に存在する必要があります。タスクは DAG 内で依存関係を設定し、DAG を跨ぐ依存は可能ですが推奨されません。DAG を跨ぐ依存は DAG グラフの直観性を低下させ、依存管理に問題を引き起こします。

  • DAG Run:DAG がそのスケジュール時間を満たすか、外部からトリガーされると、DAG Run が生成されます。DAG によってインスタンス化されたインスタンスと理解できます。

  • Task Instance:タスクがスケジュールされて起動されると、タスクインスタンスが生成されます。タスクによってインスタンス化されたインスタンスと理解できます。

基本アーキテクチャ#

Airflow はメタデータベース上に構築されたキューシステムです。データベースはキュータスクの状態を保存し、スケジューラはこれらの状態を使用して他のタスクをキューに追加する優先度を決定します。この機能は 4 つの主要コンポーネントによって編成されています:

  • メタデータベース:このデータベースはタスク状態に関する情報を保存します。データベースは SQLAlchemy で実装された抽象層を使用して更新を実行します。この抽象層は Airflow の残りのコンポーネント機能をデータベースからきれいに分離します。

  • スケジューラ:スケジューラは DAG 定義とメタデータ内のタスク状態を組み合わせて、どのタスクを実行する必要があるか、またタスク実行の優先度を決定するプロセスです。スケジューラは通常サービスとして実行されます。

  • 実行器:Executor はメッセージキュープロセスで、スケジューラにバインドされ、各タスク計画を実行する作業プロセスを決定します。異なるタイプの実行器があり、各実行器は指定された作業プロセスのクラスを使用してタスクを実行します。例えば、LocalExecutor はスケジューラプロセスと同じマシン上で実行される並行プロセスを使用してタスクを実行します。他の CeleryExecutor のような実行器は、独立した作業マシンクラスタ内に存在する作業プロセスを使用してタスクを実行します。

  • ワーカー:これらは実際にタスクロジックを実行するプロセスで、使用されている実行器によって決定されます。

Airflow の操作はタスク状態とワークフローのメタデータベース(すなわち DAG)に基づいています。スケジューラと実行器はタスクをキューに送信し、ワーカープロセスが実行します。Web サーバーは(しばしばスケジューラと同じマシンで実行され)データベースと通信し、Web UI にタスク状態とタスク実行ログを表示します。各色付きボックスは、各コンポーネントが他のコンポーネントから独立して存在できることを示しており、これはデプロイ構成のタイプによって異なります。

インストールとデプロイ#

  1. pip インストール: pip install apache-airflow

  2. 環境変数を変更して新しいディレクトリをAIRFLOW_HOMEとして指定します。

  3. 初回にairflowコマンドを実行すると、AIRFLOW_HOME ディレクトリに airflow.cfg ファイルが初期化生成されます。

[root@server ~]# airflow
  1. mysql で新しいデータベースairflowを作成し、権限を設定します。
  mysql > create database airflow default character set utf8;
  mysql > grant all on *.* to airflow@localhost identified by 'airflow_test';
  1. データベース airflow を初期化します。
[root@server ~]# airflow initdb
  1. airflow.cfg を変更します。
web_server_host = IP
web_server_port = HOST
executor = LocalExecutor
sql_alchemy_conn = mysqlデータベースアドレス
  1. デーモンプロセスを起動します(supervisord を利用してバックグラウンドで実行)。
[root@server ~]# airflow webserver
[root@server ~]# airflow sheduler

基本使用#

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}
dag = DAG(
    'test',
    default_args=default_args,
    description='my first DAG',
    schedule_interval='50 * * * *')
)

# オペレーターをインスタンス化して作成されたタスクの例
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag,
)

t2 = BashOperator(
    task_id='sleep',
    depends_on_past=False,
    bash_command='sleep 5',
    retries=3,
    dag=dag,
)

t3 = BashOperator(
    task_id='templated',
    depends_on_past=False,
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag,
)

# 依存関係の設定
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1

注意点#

タイムゾーンの問題#

システム起動時の時間やデータベースメタデータへの書き込み時間は、タイムゾーンを設定することができます。airflow の設定ファイル airflow.cfg を通じてこのタイムゾーンを変更できます。しかし、Airflow が提供する webserver 上、すなわち Web インターフェースで表示される時間はデフォルトで UTC+0 の時間です。したがって、実際のスケジュール時間と Web ページに表示される時間が異なることがあります。解決策は、こちらのブログairflow 中国のタイムゾーンを変更する(airflow ソースコードを変更)を参考にしてください。

Backfill#

Backfill とは、サーバーのダウンやその他の理由でタスクがスケジュールされなかった場合に、再起動後に Airflow が自動的に実行を補充することを指します。この動作を手動で実行することもできます:

[root@server ~]# airflow backfill sensors -s 2015-06-01 -e 2015-06-07

これは、類似のチェックポイント復元メカニズムと見なすことができ、実際には非常に良い機能ですが、自動トリガー時に、airflowはシステムの現在の時間から指定された start_date(上記の DAG 設定時に指定されたパラメータ)までのタスクを自動的に補充実行しようとするため、時には予期しない問題が発生することがあります。
例を挙げると:
私はこの一連のタスクを設定しました。30 分ごとにスケジュールを実行し、サーバーがしばらくダウンしていた後、復旧後に Airflow が自動的に Backfill をトリガーしました。システムの現在の時間から指定された start_date までの時間がまだ長く、多くのタスクが積み重なり、Backfill の実行が始まりました。30 分経っても最初のタスクの Backfill が完了せず、30 分の間隔のために新しいタスクが追加され、最終的にはタスクの積み重なりのためにタスクを実行するサーバーに問題が発生する可能性があります。

同時スケジューリングの問題#

ある時、スケジューラプロセスがクラッシュし、再起動するとすべてのタスクが一度に実行されることがわかりました。さらに、各 DAG の数日間実行されていなかったタスクが直接実行され、サーバーに大きな負荷がかかりました。
グローバルファイル airflow.cfg のパラメータを変更して同時実行を減らす:

# Executorに対する並行性の量。この設定は
# このAirflowインストールで同時に実行されるべきタスクインスタンスの最大数を定義します
parallelism = 32

# スケジューラによって同時に実行されることが許可されているタスクインスタンスの数
dag_concurrency = 16

# DAGは作成時にデフォルトで一時停止されますか
dags_are_paused_at_creation = True

# プールを使用しない場合、タスクは「デフォルトプール」で実行されます。
# この設定要素によってサイズが決まります
non_pooled_task_slot_count = 128

# 各DAGごとの最大アクティブDAG実行数
max_active_runs_per_dag = 16

補足#

Airflow のデーモンプロセスの作業メカニズム:

  • スケジューラは定期的にメタデータベース(Metastore)に登録された DAG(有向非循環グラフ、作業フローと理解できます)をポーリングして、実行が必要かどうかを確認します。特定の DAG がそのスケジュールに従って実行する必要がある場合、スケジューラデーモンはまずメタデータベースに DAG Run のインスタンスを作成し、DAG 内部の具体的なタスク(タスクは DAG に含まれる 1 つ以上のタスクと理解できます)をトリガーします。トリガーは実際にタスクを実行するのではなく、タスクメッセージをメッセージキュー(ブローカー)にプッシュします。各タスクメッセージには、このタスクの DAG ID、タスク ID、および実行する必要がある関数が含まれます。タスクが bash スクリプトを実行する場合、タスクメッセージには bash スクリプトのコードも含まれます。

  • ユーザーは webserver 上で DAG を制御することができ、例えば手動で DAG をトリガーして実行することができます。この場合、メタデータベースに DAG Run のインスタンスが作成され、スケジューラは #1 と同様の方法で DAG 内の具体的なタスクをトリガーします。

  • ワーカーデーモンはメッセージキューを監視し、メッセージがあればメッセージキューからメッセージを取得します。タスクメッセージを取得すると、メタデータ内の DAG Run インスタンスの状態を「実行中」に更新し、DAG 内のタスクを実行しようとします。DAG が成功裏に実行されれば、DAG Run インスタンスの状態を「成功」に更新し、そうでなければ「失敗」に更新します。

参考リンク#

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。