2  Task Dependencies

Task orchestration often involves ensuring tasks run in a specific order. With gusty, there are three ways to specify task dependencies:

  1. A dependencies block in a task’s Frontmatter, where you can pass a list of task ids in the current dag upon which the current task depends.

  2. An external dependencies block in a task’s Frontmatter, where you can pass a list of dag_id: task_id combinations for tasks in other dags upon which the current task depends.

  3. A dependencies attribute on your custom operator, which is a list of task ids in the current dag upon which the current task depends. This powerful option allows for task dependencies to generated dynamically and automatically.

In this section, we’ll focus on the the dependencies external dependencies blocks, available for use in any Task Definition File’s Frontmatter.

We’ll continue using our hello_dag example from the previous chapter.

2.1 Dependencies

Let’s say that our hello task depended on our hi task running before it. To specify this dependency, we would add the hi task to a list in the dependencies block of the hello.yml Task Definition File:

operator: airflow.operators.bash.BashOperator
dependencies:
  - hi
bash_command: echo hello

Now, in our Airflow UI, our DAG graph will show that hi precedes hello.

Remember, in gusty, the file name (minus the file extension) becomes the task id, so you do not need to specify hi.py, just hi.

You can list as many dependencies as you need to for any task.

2.2 External Dependencies

A common pattern in Airflow is to have tasks in one DAG depend on tasks in another DAG, or to have one DAG depend completely on another DAG. This behavior is possible in gusty by using the external_dependencies block. The external_dependencies block accepts a list of key-value pairs where each key is a DAG id and each value is a task id.

For each key-value pair listed in the external_dependencies block, gusty will generate an ExternalTaskSensor, a built-in Airflow sensor, and place the resulting sensor task upstream of the given dependent task. If the same external dependency is specified across multiple tasks, gusty will only create one sensor and place this one sensor upstream of all tasks with the specified external dependency.

There are a few ways to configure external dependencies, and we’ll look at all of them below.

Single Task External Dependency

Let’s keep building up our hello.yml Task Definition File.

To specify that our hello task depends on an upstream task, which we’ll call upstream_task, in an upstream DAG, which we’ll call upstream_dag, we add the following external_dependencies block:

operator: airflow.operators.bash.BashOperator
dependencies:
  - hi
external_dependencies:
  - upstream_dag: upstream_task
bash_command: echo hello

The result will be a new ExternalTaskSensor task with the task id wait_for_upstream_dag_upstream_task, preceding the existing hello task.

As with dependencies, you can list as many external dependecies as you require.

Whole DAG External Dependency

An alternative to speciying a single task for an external dependency is to specify that the entire upstream DAG is the dependency. In this case, we use the special keyword all to configure the ExternalTaskSensor to wait for the entire DAG:

operator: airflow.operators.bash.BashOperator
dependencies:
  - hi
external_dependencies:
  - upstream_dag: all
bash_command: echo hello

The result will be a new ExternalTaskSensor task with the task id wait_for_DAG_upstream_dag, preceding the existing hello task.

External Dependencies in METADATA.yml

As an Airflow project grows, you might find that more and more of your tasks have the same external dependency, or sometimes DAGs just logically should depend on one another (e.g. a DAG that ingests data should precede a DAG that transforms that data). For these cases, you can utilize the same exact same external_dependencies block in any METADATA.yml file.

When you specify an external dependency in a METADATA.yml file, the ExternalTaskSensor task will be placed at the root of the DAG, ensuring that no tasks in the DAG run before the ExternalTaskSensor task completes.

Here’s what it would like like if we took the same external dependency from above and place it in an external_dependencies block in METADATA.yml instead:

The ExternalTaskSensor now precedes every other task in the graph.

Offset Schedules

Understandably, but frustratingly, the default behavior of Airflow’s ExternalTaskSensor is to look for DAG runs that have that have ran at the same “logical date”. This means that if you have one DAG scheduled to run daily at 00:00 UTC ("0 0 * * *"), let’s call this DAG earlier_dag, and another DAG scheduled to run daily at 06:00 UTC ("0 6 * * *"), let’s call this DAG later_dag, and you specify an external dependency between later_dag and earlier_dag, the default syntax for an external_dependencies block will not work, because - in the case where later_dag depends on earlier_dag - the ExternalTaskSensor in later_dag will be looking for an 06:00 UTC DAG run of earlier_dag, which does not exist.

Fortunately, the external_dependencies block accepts an alternative syntax for this scenario, where:

  • The keys under external_dependencies are the external DAG ids.

  • A tasks list is provided for a given external DAG.

  • Additional configuration for the ExternalTaskSensor class, such as the execution_delta, can be passed in.

For example, to configure later_dag (06:00 UTC) to depend on earlier_dag (00:00 UTC), we could add the following block to later_dag’s METADATA.yml:

external_dependencies:
  earlier_dag:
    execution_delta: !timedelta
      hours: 6
    tasks:
      - all

This will ensure the resulting wait_for_DAG_earlier_dag looks for a successful earlier_dag DAG run at 00:00 UTC (later_dag’s 06:00 UTC run minus 6 hours).

Alternative Approaches to Offset Schedules

Custom Sensors

It’s possible to create a custom sensor that “doesn’t care” about the logical date, and just looks at the last/latest DAG run. This ensures you don’t have to worry about setting any offset schedules.

Here is a small snippet inspired by the cal-itp/data-infra repo (which they since deleted in this commit):

from airflow.utils.db import provide_session
from airflow.sensors.external_task_sensor import ExternalTaskSensor


class LastDagRunSensor(ExternalTaskSensor):
    def __init__(self, external_dag_id, external_task_id=None, **kwargs):
        super().__init__(
          external_dag_id=external_dag_id, 
          external_task_id=external_task_id,
          **kwargs)

        def dag_last_exec(crnt_dttm):
            return self.get_dag_last_execution_date(self.external_dag_id)

        self.execution_date_fn = dag_last_exec

    @provide_session
    def get_dag_last_execution_date(self, dag_id, session):
        from airflow.models import DagModel

        q = session.query(DagModel).filter(DagModel.dag_id == self.external_dag_id)

        dag = q.first()
        return dag.get_last_dagrun().logical_date

In the event you wanted to use this LastDagRunSensor as the sensor class for the external dependencies in your gusty DAG, you could do so by using the wait_for_class argument available in create_dag. For example, here’s what your later_dag.py DAG file might look like if you decided to do so:

import os
from gusty import create_dag
# Wherever you store the code for the above sensor..
from plugins.sensors import LastDagRunSensor

later_dag_dir = os.path.join(
  os.environ["AIRFLOW_HOME"], 
  "dags", 
  "later_dag")

later_dag = create_dag(
  later_dag_dir, 
  wait_for_class=LastDagRunSensor,
  latest_only=False)

Now all of the external dependencies defined in the later_dag’s Task Definition Files will use the custom LastDagRunSensor instead of the default ExternalTaskSensor.

Other External Dependency Considerations

You can configure your external dependencies further using the wait_for_defaults argument in create_dag, which accepts a dictionary of arguments that are available to Airflow’s ExternalTaskSensor. Here is the subset of parameters available in wait_for_defaults:

  • poke_interval
  • timeout
  • retries
  • mode
  • soft_fail
  • execution_delta
  • execution_date_fn
  • check_existence

Additionally, anything available to BaseOperator will be passed through.

Set mode to reschedule

By default in Airflow, sensors run in mode="poke", which means they take up a worker slot for the entire time they are waiting for the external task/DAG to complete. You can set mode="reschedule" to free up the worker slot in between “pokes”. Building on the create_dag call in later_dag.py above:

later_dag = create_dag(
  later_dag_dir, 
  wait_for_class=LastDagRunSensor,
  wait_for_defaults={
    "mode": "reschedule"
    },
  latest_only=False)

Set a timeout

By default in gusty, external dependencies will timeout after 1 hour, or 3600 seconds. If you want to wait longer, you can set your timeout, in seconds:

later_dag = create_dag(
  later_dag_dir, 
  wait_for_class=LastDagRunSensor,
  wait_for_defaults={
    "mode": "reschedule",
    "timeout": 7200 # 2 hours in seconds
    },
  latest_only=False)

Learn More

If you want to learn more about sensors, check out Airflow’s BaseSensorOperator and Airflow’s BaseOperator.