Yige

Yige

Build

調度系統-Airflow

調度系統 - Airflow#

調度系統的選型#

現在的開源調度系統分為兩類:

  • 以 定時類調度系統,設計核心是定時運行、數據分片和彈性擴容,但是對依賴關係支持的不太友好,更適合於後端業務開發,其代表為 XXL-JOBElastic-Job

  • 以 DAG 為核心的工作流調度系統,強調的是任務的依賴關係,常見有:

    1. Oozie:Oozie 是基於 XML 格式進行開發的,後續集成到 Hue 裡可以可視化配置,但是缺點也很明顯,版本管理、日誌收集都不太友好,開發靈活性很差,可調度的任務也很少,另外定義過於複雜,維護成本很高。當然最核心還是沒有共用變量和共用連接信息的概念。
    2. Azkaban:和 Oozie 差不多,最核心問題還是沒有共用變量和共用連接信息的概念
    3. Airflow
    4. Dolphinscheduler: 剛開源不久的 Apache 孵化項目,國人開發和貢獻的

Airflow 基本使用#

基本概念#

  • DAG :意為有向無循環圖,在 Airflow 中則定義了整個完整的作業。同一個 DAG 中的所有 Task 擁有相同的調度時間。

  • Task: 為 DAG 中具體的作業任務,它必須存在於某一個 DAG 之中。Task 在 DAG 中配置依賴關係,跨 DAG 的依賴是可行的,但是並不推薦。跨 DAG 依賴會導致 DAG 圖的直觀性降低,並給依賴管理帶來麻煩。

  • DAG Run: 當一個 DAG 滿足它的調度時間,或者被外部觸發時,就會產生一個 DAG Run。可以理解為由 DAG 實例化的實例。

  • Task Instance:當一個 Task 被調度啟動時,就會產生一個 Task Instance。可以理解為由 Task 實例化的實例

基本架構#

Airflow 是建立在元數據庫上的隊列系統。數據庫存儲隊列任務的狀態,調度器使用這些狀態來確定如何將其它任務添加到隊列的優先級。此功能由四個主要組件編排:

  • 元數據庫:這個數據庫存儲有關任務狀態的信息。數據庫使用在 SQLAlchemy 中實現的抽象層執行更新。該抽象層將 Airflow 剩餘組件功能從數據庫中乾淨地分離了出來。

  • 調度器:調度器是一種使用 DAG 定義結合元數據中的任務狀態來決定哪些任務需要被執行以及任務執行優先級的過程。調度器通常作為服務運行。

  • 執行器:Excutor 是一個消息隊列進程,它被綁定到調度器中,用於確定實際執行每個任務計劃的工作進程。有不同類型的執行器,每個執行器都使用一個指定工作進程的類來執行任務。例如,LocalExecutor 使用與調度器進程在同一台機器上運行的並行進程執行任務。其他像 CeleryExecutor 的執行器使用存在於獨立的工作機器集群中的工作進程執行任務。

  • Workers:這些是實際執行任務邏輯的進程,由正在使用的執行器確定。

Airflow 的操作建立於存儲任務狀態和工作流的元數據庫之上(即 DAG)。調度器和執行器將任務發送至隊列,讓 Worker 進程執行。WebServer 運行(經常與調度器在同一台機器上運行)並與數據庫通信,在 Web UI 中呈現任務狀態和任務執行日誌。每個有色框表明每個組件都可以獨立於其他組件存在,這取決於部署配置的類型

安裝部署#

  1. pip 安裝: pip install apache-airflow

  2. 修改環境變量新建目錄指定為AIRFLOW_HOME

  3. 首次執行airflow命令會在 AIRFLOW_HOME 目錄下初始化生成 airflow.cfg 文件

[root@server ~]# airflow
  1. mysql 新建數據庫airflow並配置權限
  mysql > create database airflow default character set utf8;
  mysql > grant all on *.* to airflow@localhost identified by 'airflow_test';
  1. 初始化數據庫 airflow
[root@server ~]# airflow initdb
  1. 修改 airflow.cfg
web_server_host = IP
web_server_port = HOST
executor = LocalExecutor
sql_alchemy_conn = mysql數據庫地址
  1. 啟動守護進程 (利用 supervisord 後台掛起)
[root@server ~]# airflow webserver
[root@server ~]# airflow sheduler

基本使用#

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}
dag = DAG(
    'test',
    default_args=default_args,
    description='my first DAG',
    schedule_interval='50 * * * *')
)

# examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag,
)

t2 = BashOperator(
    task_id='sleep',
    depends_on_past=False,
    bash_command='sleep 5',
    retries=3,
    dag=dag,
)

t3 = BashOperator(
    task_id='templated',
    depends_on_past=False,
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag,
)

# Setting up Dependencies
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1

踩坑#

時區問題#

系統啟動的時間,以及寫入數據庫元數據的時間確是可以配置時區的,可以通過配置 airflow 的配置文件 airflow.cfg 改變這個時區。但是在 airflow 提供的 webserver 上,即 web 界面上我們看到的時間默認都是 UTC+0 的時間。因此就會存在實際調度時間與 web 頁面顯示的不一樣,解決方案可以參考一下這篇博客airflow 修改中國時區 (改 airflow 源碼)

Backfill#

Backfill,大致來講就是在某些時候因為伺服器宕機或者其他原因導致有些任務沒有調度,然後重啟後 airflow 自動回補執行去調度這些任務,可以使用手動方法來執行這個行為:

[root@server ~]# airflow backfill sensors -s 2015-06-01 -e 2015-06-07

可以看做一個類似斷點恢復機制,其實是一個很好的功能,但在自動觸發的時候,因為airflow會默認去回補執行從系統當前時間到我們指定的 start_date(上面配置 dag 的時候指定的一個參數)期間的任務,那麼在有些時候就會出現不可預期的問題.
舉個例子:
我設置了這樣一批任務,每半小時執行一次調度,然後因為伺服器宕機一段時間了,恢復後 airflow 自動觸發了 Backfill,系統當前時間到我們指定的 start_date 時間段還比較長,堆積了很多任務,然後開始回補執行,半個小時我還沒回補執行完第一批任務,然後因為間隔半小時我又有一批新任務加入了進來,可能最後就會因為任務堆積的原因導致跑任務的伺服器出現問題

並發調度問題#

某次因為 scheduler 進程掛掉了,重啟起來發現全部任務被一塊執行了。甚至每個 dag 多天沒有跑完的任務直接起來,導致伺服器一下壓力太大。
修改全局文件 airflow.cfg 的參數降低並發:

# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
parallelism = 32

# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 16

# Are DAGs paused by default at creation
dags_are_paused_at_creation = True

# When not using pools, tasks are run in the "default pool",
# whose size is guided by this config element
non_pooled_task_slot_count = 128

# The maximum number of active DAG runs per DAG
max_active_runs_per_dag = 16

補充#

airflow 的守護進程工作機制:

  • 調度器 scheduler 會間隔性的去輪詢元數據庫(Metastore)已註冊的 DAG(有向無環圖,可理解為作業流)是否需要被執行。如果一個具體的 DAG 根據其調度計劃需要被執行,scheduler 守護進程就會先在元數據庫創建一個 DagRun 的實例,並觸發 DAG 內部的具體 task(任務,可以這樣理解:DAG 包含一個或多個 task),觸發其實並不是真正的去執行任務,而是推送 task 消息至消息隊列(即 broker)中,每一個 task 消息都包含此 task 的 DAG ID,task ID,及具體需要被執行的函數。如果 task 是要執行 bash 腳本,那麼 task 消息還會包含 bash 腳本的代碼。

  • 用戶可能在 webserver 上來控制 DAG,比如手動觸發一個 DAG 去執行。當用戶這樣做的時候,一個 DagRun 的實例將在元數據庫被創建,scheduler 使同 #1 一樣的方法去觸發 DAG 中具體的 task 。

  • worker 守護進程將會監聽消息隊列,如果有消息就從消息隊列中取出消息,當取出任務消息時,它會更新元數據中的 DagRun 實例的狀態為正在運行,並嘗試執行 DAG 中的 task,如果 DAG 執行成功,則更新任 DagRun 實例的狀態為成功,否則更新狀態為失敗

參考鏈接#

載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。