2 Task Dependencies
Task orchestration often involves ensuring tasks run in a specific order. With gusty, there are three ways to specify task dependencies:
A dependencies block in a task’s Frontmatter, where you can pass a list of task ids in the current dag upon which the current task depends.
An external dependencies block in a task’s Frontmatter, where you can pass a list of
dag_id: task_id
combinations for tasks in other dags upon which the current task depends.A dependencies attribute on your custom operator, which is a list of task ids in the current dag upon which the current task depends. This powerful option allows for task dependencies to generated dynamically and automatically.
In this section, we’ll focus on the the dependencies external dependencies blocks, available for use in any Task Definition File’s Frontmatter.
We’ll continue using our hello_dag
example from the previous chapter.
2.1 Dependencies
Let’s say that our hello
task depended on our hi
task running before it. To specify this dependency, we would add the hi
task to a list in the dependencies
block of the hello.yml
Task Definition File:
operator: airflow.operators.bash.BashOperator
dependencies:
- hi
bash_command: echo hello
Now, in our Airflow UI, our DAG graph will show that hi
precedes hello
.
Remember, in gusty, the file name (minus the file extension) becomes the task id, so you do not need to specify hi.py
, just hi
.
You can list as many dependencies as you need to for any task.
2.2 External Dependencies
A common pattern in Airflow is to have tasks in one DAG depend on tasks in another DAG, or to have one DAG depend completely on another DAG. This behavior is possible in gusty by using the external_dependencies
block. The external_dependencies
block accepts a list of key-value pairs where each key is a DAG id and each value is a task id.
For each key-value pair listed in the external_dependencies
block, gusty will generate an ExternalTaskSensor, a built-in Airflow sensor, and place the resulting sensor task upstream of the given dependent task. If the same external dependency is specified across multiple tasks, gusty will only create one sensor and place this one sensor upstream of all tasks with the specified external dependency.
There are a few ways to configure external dependencies, and we’ll look at all of them below.
Single Task External Dependency
Let’s keep building up our hello.yml
Task Definition File.
To specify that our hello
task depends on an upstream task, which we’ll call upstream_task
, in an upstream DAG, which we’ll call upstream_dag
, we add the following external_dependencies
block:
operator: airflow.operators.bash.BashOperator
dependencies:
- hi
external_dependencies:
- upstream_dag: upstream_task
bash_command: echo hello
The result will be a new ExternalTaskSensor
task with the task id wait_for_upstream_dag_upstream_task
, preceding the existing hello
task.
As with dependencies
, you can list as many external dependecies as you require.
Whole DAG External Dependency
An alternative to speciying a single task for an external dependency is to specify that the entire upstream DAG is the dependency. In this case, we use the special keyword all
to configure the ExternalTaskSensor
to wait for the entire DAG:
operator: airflow.operators.bash.BashOperator
dependencies:
- hi
external_dependencies:
- upstream_dag: all
bash_command: echo hello
The result will be a new ExternalTaskSensor
task with the task id wait_for_DAG_upstream_dag
, preceding the existing hello
task.
External Dependencies in METADATA.yml
As an Airflow project grows, you might find that more and more of your tasks have the same external dependency, or sometimes DAGs just logically should depend on one another (e.g. a DAG that ingests data should precede a DAG that transforms that data). For these cases, you can utilize the same exact same external_dependencies
block in any METADATA.yml
file.
When you specify an external dependency in a METADATA.yml
file, the ExternalTaskSensor
task will be placed at the root of the DAG, ensuring that no tasks in the DAG run before the ExternalTaskSensor
task completes.
Here’s what it would like like if we took the same external dependency from above and place it in an external_dependencies
block in METADATA.yml
instead:
The ExternalTaskSensor
now precedes every other task in the graph.
Offset Schedules
Understandably, but frustratingly, the default behavior of Airflow’s ExternalTaskSensor
is to look for DAG runs that have that have ran at the same “logical date”. This means that if you have one DAG scheduled to run daily at 00:00 UTC ("0 0 * * *"
), let’s call this DAG earlier_dag
, and another DAG scheduled to run daily at 06:00 UTC ("0 6 * * *"
), let’s call this DAG later_dag
, and you specify an external dependency between later_dag
and earlier_dag
, the default syntax for an external_dependencies
block will not work, because - in the case where later_dag
depends on earlier_dag
- the ExternalTaskSensor
in later_dag
will be looking for an 06:00 UTC DAG run of earlier_dag
, which does not exist.
Fortunately, the external_dependencies
block accepts an alternative syntax for this scenario, where:
The keys under
external_dependencies
are the external DAG ids.A
tasks
list is provided for a given external DAG.Additional configuration for the
ExternalTaskSensor
class, such as theexecution_delta
, can be passed in.
For example, to configure later_dag
(06:00 UTC) to depend on earlier_dag
(00:00 UTC), we could add the following block to later_dag
’s METADATA.yml
:
external_dependencies:
earlier_dag:
execution_delta: !timedelta
hours: 6
tasks:
- all
This will ensure the resulting wait_for_DAG_earlier_dag
looks for a successful earlier_dag
DAG run at 00:00 UTC (later_dag
’s 06:00 UTC run minus 6 hours).
Alternative Approaches to Offset Schedules
Custom Sensors
It’s possible to create a custom sensor that “doesn’t care” about the logical date, and just looks at the last/latest DAG run. This ensures you don’t have to worry about setting any offset schedules.
Here is a small snippet inspired by the cal-itp/data-infra repo (which they since deleted in this commit):
from airflow.utils.db import provide_session
from airflow.sensors.external_task_sensor import ExternalTaskSensor
class LastDagRunSensor(ExternalTaskSensor):
def __init__(self, external_dag_id, external_task_id=None, **kwargs):
super().__init__(
=external_dag_id,
external_dag_id=external_task_id,
external_task_id**kwargs)
def dag_last_exec(crnt_dttm):
return self.get_dag_last_execution_date(self.external_dag_id)
self.execution_date_fn = dag_last_exec
@provide_session
def get_dag_last_execution_date(self, dag_id, session):
from airflow.models import DagModel
= session.query(DagModel).filter(DagModel.dag_id == self.external_dag_id)
q
= q.first()
dag return dag.get_last_dagrun().logical_date
In the event you wanted to use this LastDagRunSensor
as the sensor class for the external dependencies in your gusty DAG, you could do so by using the wait_for_class
argument available in create_dag
. For example, here’s what your later_dag.py
DAG file might look like if you decided to do so:
import os
from gusty import create_dag
# Wherever you store the code for the above sensor..
from plugins.sensors import LastDagRunSensor
= os.path.join(
later_dag_dir "AIRFLOW_HOME"],
os.environ["dags",
"later_dag")
= create_dag(
later_dag
later_dag_dir, =LastDagRunSensor,
wait_for_class=False) latest_only
Now all of the external dependencies defined in the later_dag
’s Task Definition Files will use the custom LastDagRunSensor
instead of the default ExternalTaskSensor
.
Other External Dependency Considerations
You can configure your external dependencies further using the wait_for_defaults
argument in create_dag
, which accepts a dictionary of arguments that are available to Airflow’s ExternalTaskSensor. Here is the subset of parameters available in wait_for_defaults
:
poke_interval
timeout
retries
mode
soft_fail
execution_delta
execution_date_fn
check_existence
Additionally, anything available to BaseOperator will be passed through.
Set mode to reschedule
By default in Airflow, sensors run in mode="poke"
, which means they take up a worker slot for the entire time they are waiting for the external task/DAG to complete. You can set mode="reschedule"
to free up the worker slot in between “pokes”. Building on the create_dag
call in later_dag.py
above:
= create_dag(
later_dag
later_dag_dir, =LastDagRunSensor,
wait_for_class={
wait_for_defaults"mode": "reschedule"
},=False) latest_only
Set a timeout
By default in gusty, external dependencies will timeout after 1 hour, or 3600 seconds. If you want to wait longer, you can set your timeout
, in seconds:
= create_dag(
later_dag
later_dag_dir, =LastDagRunSensor,
wait_for_class={
wait_for_defaults"mode": "reschedule",
"timeout": 7200 # 2 hours in seconds
},=False) latest_only
Learn More
If you want to learn more about sensors, check out Airflow’s BaseSensorOperator and Airflow’s BaseOperator.