Yige

Yige

Build

Scheduling System - Airflow

Scheduling System - Airflow#

Selection of Scheduling Systems#

Current open-source scheduling systems can be divided into two categories:

  • Time-based scheduling systems, where the core design focuses on scheduled execution, data sharding, and elastic scaling, but they are not very friendly in terms of dependency support, making them more suitable for backend business development. Representatives include XXL-JOB and Elastic-Job.

  • Workflow scheduling systems centered around DAGs, which emphasize task dependencies. Common examples include:

    1. Oozie: Oozie is developed based on XML format and can be visually configured after integration into Hue. However, its drawbacks are also evident, such as poor version management and log collection, low development flexibility, and a limited number of schedulable tasks. Additionally, its definitions are overly complex, leading to high maintenance costs. The core issue is the lack of the concept of shared variables and shared connection information.
    2. Azkaban: Similar to Oozie, the core issue remains the lack of the concept of shared variables and shared connection information.
    3. Airflow
    4. Dolphinscheduler: A recently open-sourced Apache incubator project, developed and contributed by Chinese developers.

Basic Usage of Airflow#

Basic Concepts#

  • DAG: Stands for Directed Acyclic Graph, which defines the entire job in Airflow. All tasks within the same DAG share the same scheduling time.

  • Task: A specific job task within a DAG, which must exist within a particular DAG. Tasks in a DAG configure dependencies, and cross-DAG dependencies are possible but not recommended. Cross-DAG dependencies can reduce the intuitiveness of the DAG graph and complicate dependency management.

  • DAG Run: A DAG Run occurs when a DAG meets its scheduling time or is triggered externally. It can be understood as an instance instantiated by the DAG.

  • Task Instance: A Task Instance is created when a Task is scheduled to start. It can be understood as an instance instantiated by the Task.

Basic Architecture#

Airflow is a queue system built on a metadata database. The database stores the status of queued tasks, and the scheduler uses these statuses to determine how to prioritize adding other tasks to the queue. This functionality is orchestrated by four main components:

  • Metadata Database: This database stores information about task statuses. The database executes updates using an abstraction layer implemented in SQLAlchemy. This abstraction layer cleanly separates the functionality of the remaining components of Airflow from the database.

  • Scheduler: The scheduler is a process that uses the DAG definitions combined with task statuses in the metadata to decide which tasks need to be executed and the execution priority of those tasks. The scheduler typically runs as a service.

  • Executor: The Executor is a message queue process that is bound to the scheduler, used to determine the worker processes that will actually execute each scheduled task. There are different types of executors, each using a specified worker process class to execute tasks. For example, LocalExecutor executes tasks using parallel processes running on the same machine as the scheduler process. Other executors, like CeleryExecutor, use worker processes that exist in a separate cluster of worker machines.

  • Workers: These are the processes that actually execute the task logic, determined by the executor in use.

The operation of Airflow is built on the metadata database that stores task statuses and workflows (i.e., DAGs). The scheduler and executor send tasks to the queue for worker processes to execute. The WebServer runs (often on the same machine as the scheduler) and communicates with the database to present task statuses and execution logs in the Web UI. Each colored box indicates that each component can exist independently of the others, depending on the type of deployment configuration.

Installation and Deployment#

  1. Install via pip: pip install apache-airflow

  2. Modify the environment variable to create a new directory specified as AIRFLOW_HOME.

  3. The first execution of the airflow command will initialize and generate the airflow.cfg file in the AIRFLOW_HOME directory.

[root@server ~]# airflow
  1. Create a new MySQL database airflow and configure permissions.
  mysql > create database airflow default character set utf8;
  mysql > grant all on *.* to airflow@localhost identified by 'airflow_test';
  1. Initialize the airflow database.
[root@server ~]# airflow initdb
  1. Modify airflow.cfg.
web_server_host = IP
web_server_port = HOST
executor = LocalExecutor
sql_alchemy_conn = MySQL database address
  1. Start the daemon (using supervisord to run in the background).
[root@server ~]# airflow webserver
[root@server ~]# airflow scheduler

Basic Usage#

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}
dag = DAG(
    'test',
    default_args=default_args,
    description='my first DAG',
    schedule_interval='50 * * * *')
)

# examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag,
)

t2 = BashOperator(
    task_id='sleep',
    depends_on_past=False,
    bash_command='sleep 5',
    retries=3,
    dag=dag,
)

t3 = BashOperator(
    task_id='templated',
    depends_on_past=False,
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag,
)

# Setting up Dependencies
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1

Pitfalls#

Time Zone Issues#

The time when the system starts and the time written into the metadata database can be configured for time zones, which can be changed by modifying the airflow configuration file airflow.cfg. However, on the web server provided by Airflow, the time displayed on the web interface is by default in UTC+0. Therefore, there may be discrepancies between the actual scheduling time and the time displayed on the web page. A solution can be found in this blog post Modifying Airflow's Time Zone to China (Modifying Airflow Source Code).

Backfill#

Backfill generally refers to the automatic execution of tasks that were not scheduled due to server downtime or other reasons when Airflow restarts. This behavior can be manually executed:

[root@server ~]# airflow backfill sensors -s 2015-06-01 -e 2015-06-07

It can be seen as a kind of breakpoint recovery mechanism, which is actually a very good feature. However, when triggered automatically, Airflow will default to backfilling tasks from the current system time to the specified start_date (a parameter specified when configuring the DAG above), which can sometimes lead to unexpected issues.
For example:
I set up a batch of tasks to execute scheduling every half hour, and after the server was down for a while, when it recovered, Airflow automatically triggered Backfill. If there is still a long time from the current system time to the specified start_date, many tasks will accumulate, and when backfilling starts, if I haven't finished executing the first batch of tasks in half an hour, a new batch of tasks may come in due to the half-hour interval, which could eventually cause issues with the server running the tasks due to task accumulation.

Concurrency Scheduling Issues#

Once, due to the scheduler process crashing, when it restarted, I found that all tasks were executed at once. Even tasks that had not run for several days in each DAG were triggered, putting a lot of pressure on the server.
Modify the global parameters in airflow.cfg to reduce concurrency:

# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
parallelism = 32

# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 16

# Are DAGs paused by default at creation
dags_are_paused_at_creation = True

# When not using pools, tasks are run in the "default pool",
# whose size is guided by this config element
non_pooled_task_slot_count = 128

# The maximum number of active DAG runs per DAG
max_active_runs_per_dag = 16

Additional Information#

The working mechanism of Airflow's daemon processes:

  • The scheduler will periodically poll the metadata database (Metastore) to check if the registered DAGs (Directed Acyclic Graphs, which can be understood as job flows) need to be executed. If a specific DAG needs to be executed according to its scheduling plan, the scheduler daemon will first create a DagRun instance in the metadata database and trigger the specific tasks within the DAG (tasks can be understood as: a DAG contains one or more tasks). The triggering does not actually execute the tasks but pushes task messages to the message queue (i.e., broker). Each task message contains the DAG ID, task ID, and the specific function that needs to be executed. If the task is to execute a bash script, the task message will also contain the code of the bash script.

  • Users may control the DAG on the web server, such as manually triggering a DAG to execute. When users do this, a DagRun instance will be created in the metadata database, and the scheduler will trigger the specific tasks within the DAG in the same way as described in #1.

  • The worker daemon will listen to the message queue. If there are messages, it will retrieve them from the queue. When it retrieves a task message, it will update the status of the DagRun instance in the metadata to "running" and attempt to execute the tasks in the DAG. If the DAG executes successfully, it will update the status of the DagRun instance to "success"; otherwise, it will update the status to "failure".

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.