Apache Airflow Cheatsheet

Alex Gordienko
Geek Culture
Published in
6 min readOct 26, 2021

--

Photo by Jason Blackeye on Unsplash

Apache Airflow is a system to programmatically author, schedule, and monitor data pipelines. It has been six years since version 1.0.0 of Apache Airflow was released. Months of coding, fixing, deploying onto the on-premise servers as well as in the cloud in the hundreds of companies over the world. Many difficulties were passed and here is an attempt to put some of the conclusions in one place to help others save their time and make using Airflow easier.

If you will find that some statements are not accurate, please do not hesitate to comment, it will be fixed as soon as possible.

General

  1. It’s easy to start your local Airflow environment by using official Docker-image from here https://hub.docker.com/r/apache/airflow
  2. Airflow API documentation is good to know what levers you can use to integrate your external system with Airflow. Please read it here https://airflow.apache.org/docs/apache-airflow/stable/security/api.html
  3. It’s good to read airflow.cfg (with all the comments inside) to learn more about how you can configure your Airflow installation. And please, spend some time to go through the project repo to understand how it’s built here https://github.com/apache/airflow.
  4. Airflow (as well as my lovely Apache Superset) is based on top of Flask and Flask App Builder (FAB) frameworks. It means you have a lot of customisable features, i.e. your own security manager. You can read about it here https://medium.com/geekculture/custom-security-manager-for-apache-superset-c91f413a8be7.

Scan files and DAG configuration

  1. If you want the scheduler to skip files in your DAG folder, just don’t use words airflow and dag in files. Airflow scheduler scans only files with those words in the text by default. If you want to change the default behavior just use DAG_DICOVERY_SAFE_MODE=False in your airflow.cfg
  2. Another way is to ignore unnecessary files via .airflowignore file in the root of the DAGs folder. It works the same as the .gitignore
  3. Be careful, there will be no errors if you have different DAGs with the same dag_id. Web UI will display one random DAG at one time.
  4. It might be not intuitive but start_date is an optional parameter for DAG. It is obligatory for each task, but for some reason it could be different in different tasks of the DAG which is counterintuitive.
  5. Default schedule_interval for DAG is timedelta(days=1)
  6. The dagrun_timeout parameter could help you if you have problems with neverending DAGs.

DAG runs

  1. The catchup parameter is set to True by default. It means each time you unpause the DAG all the missed dagruns will start immediately. The best practice is to set this to False for each and every DAG. Even more, you might want to set CATCHUP_BY_DEFAULT=False in airflow.cfg to change the default behavior for all DAGs. Anyway, even with catchup=False one dagrun still will be run after unpausing.
  2. The first dagrun for DAG will be triggered at start_date + schedule_interval which is the execution_date in fact.
  3. You can create timezone-aware DAGs using pendulum module. In versions ≥1.10.7, if the timezone was specified for DAG, the cron expression will respect daylight savings time (DST), but timedelta object won’t do so. By the way, in Airflow versions prior to 1.10.7 everything was the other way around — timedelta objects respected DST, but cron expressions were not.
  4. Do not treat Airflow like multifunctional something. It is a very powerful process orchestrator, or as some people say it is a cron on steroids. Use it for triggering jobs outside your Airflow system. For example, if you want to load data from PgSQL or MySQL to Clickhouse DB — if applicable, use it’s own mechanism for loading data from external tables instead of writing your own custom operator or using PostgresOperator with ClickHouseOperator. Some people think that the DockerOperator and KubernetesPodOperator should be the only two types of operators you use in production environment.
  5. If you use an AWS environment and you need to start job when a new file arrives to S3 bucket — it’d be better to use AWS Lambda to trigger your dagrun instead of using sensors. Here is an useful article on how to use Lambdas https://medium.com/swlh/aws-lambda-run-your-code-for-free-1c7fa6714ee9.

Variables and connections

  1. It is possible to hide values from some variables. You could find which fields will be masked in DEFAULT_SENSITIVE_VARIABLE_FIELDS in airflow.cfg.
  2. You can get adict() object from JSON variable value by using deserialize_json=True as a parameter of Variable.get(). It can help decrease the number of connections to the Airflow database.
  3. You will benefit from putting Variable.get() statements inside the function definitions instead of using them in the main DAG code. Otherwise, the scheduler will create database connections each time it scans file systems for new DAGs.
  4. It is possible to decrease database connections without moving Variable.get() statements into the function definitions by using environment variables. Env variables whose start with AIRFLOW_VAR_* are processed before UI variables and stop further searching of variables if there are any. Please, be advised that env variables will not be visible in the UI.
  5. Another way is to use Jinja templating. Templated fields of the task will be processed at task run only, but not at the time of periodic DAG folder scanning by the scheduler. To read a variable in the templated field use op_args=["{{ var.json.varname.jsonkey }}"]
  6. You can create Airflow connections using environment variables as well as by using AIRFLOW_CONN_* pattern in the name.
  7. Since the version 1.10.10 it is possible to use custom secret backends like AWS Secrets Manager or GCP Secret Manager.

Task execution parameters

  1. Instead of using XCOM to transfer data between tasks, you are encouraged to transfer metadata through XCOM.
  2. The depends_on_past parameter of task prevents a task from being executed before the same task in the previous dagrun was either marked as successful or failed.
  3. The wait_for_downstream parameter allows task to be executed only when the same task and its direct downstream tasks were completed in the previous dagrun.
  4. The task priority_rule='downstream' (which is the default) calculates absolute task priority weight by summing up priorities of all downstream tasks. For example, when you have 2 dagruns running and a task pool of size 1 execution of dagruns will go in parallel (task from one dagrun and then from another).
  5. The task priority_rule='upstream' calculates absolute task priority weight by summing up priorities of all upstream tasks. For example, when you have 2 dagruns running and a task pool of size 1 tasks from the second dagrun will be executed only when the last task in the first dagrun was completed.
  6. execution_timeout parameter can be useful with long-running tasks.
  7. trigger_rule is here for you, if you want to change the behavior of the task which ALL_SUCCESS by default. Full list of values available could be found here https://github.com/apache/airflow/blob/main/airflow/utils/trigger_rule.py

Parallel execution

  1. parallelism (32 by default) — how many tasks can be executed at the same time in the whole Airflow instance (airflow.cfg parameter).
  2. dag_concurrency (16 by default) — how many tasks can be executed at the same time in the DAG (DAG parameter).
  3. max_active_runs_per_dag (16 by default) — how many DAGs can be executed at the same time (airflow.cfg parameter).
  4. max_active_runs — how many DAGs can be executed at the same time (DAG parameter).
  5. max_queued_runs_per_dag maximum number of queued dagruns for a single DAG, the scheduler will not create more DAG runs (airflow.cfg parameter).

Sensors

  1. poke_interval — how often the sensor will check the condition.
  2. Sensor with mode='poke' occupies a slot in a pool until condition met.
  3. Sensor with mode='reschedule' occupies a slot in a pool only for a time of checking condition and releases slot until the next poke.
  4. timeout parameter (7 days by default) with soft_fail=True can set the sensor to skipped state instead offailed.
  5. exponental_backoff=True replaces fixed poke_interval with a more dynamic one.

DAG dependencies

  1. ExternalTaskSensor monitor specific task_id and its state in another DAG.
  2. TriggerDagRunOperator triggers another DAG and can wait while it is getting executed.

I hope, you have enjoyed the article and this piece of information will help you to build efficient data pipelines. If so, please, follow me on Medium, GitHub, Twitter, and LinkedIn.

--

--