6  Multi-tasking

Sometimes orchestration involves repetition. For example, you might have a DAG with 3 different tasks that fetch stock data for 3 different stock symbols. To create this DAG, you’d likely use a for loop. You can achieve this for loop style task generation within gusty using some special frontmatter blocks:

In both multi_task_spec and python_callable_partials, the keys below each block will be the task id for a given task, and the arguments below each task id will be passed to that operator or callable, respectively.

We’ll look at examples of each below.

6.1 multi_task_spec

Imagine we want three BashOperator tasks to echo “hi”, “hey”, and “hello world”.

We can define all three tasks in a single Task Definition File, which we’ll call multi_greeing.yml. The name of the Task Definition File is arbitrary. Here are its contents:

operator: airflow.operators.bash.BashOperator
bash_command: echo $GREETING
multi_task_spec:
  say_hi:
    env:
      GREETING: hi
  say_hey:
    env:
      GREETING: hey
  say_hello_world:
    env:
      GREETING: hello
    bash_command: echo $GREETING world

The above Task Definition File will create three tasks: say_hi, say_hey, and say_hello_world. The tasks say_hi and say_hey both inherit the same bash_command, but have different env arguments. The say_hello_world task also contains its own env argument, but goes a step further as to define its own bash_command.

This multi_task_spec produces the following graph:

This powerful syntax allows you to keep your task definitions DRY. In this example, every task has a dedicated, unique env argument. In the case of say_hi and say_hey, they share a common bash_command. In the case of say_hello_world, it gets its own env and bash_command. Very flexible!

6.2 python_callable_partials

Similar to multi_task_spec, python_callable_partials allows you to generate multiple tasks in a single file, except instead of passing arguments to an operator, you pass arguments directly to a python_callable.

In the example Task Definition File below, we’ll create a few tasks to fetch the past year’s stock data from yfinance for three different stock symbols: AMZN, GOOG, and MSFT.

# ---
# python_callable: main
# python_callable_partials:
#   get_amzn:
#     symbol: AMZN
#   get_goog:
#     symbol: GOOG
#   get_msft:
#     symbol: MSFT
# ---

def main(symbol):
  from yfinance import Ticker

  stock = Ticker(symbol)
  history = stock.history(period='1y', interval='1d').reset_index()
  history["Symbol"] = symbol
  print(history.head())

The above Task Definition File will create three tasks: get_amzn, get_goog, and get_msft. Each task will have its respective symbol passed to the the main function.

6.3 Mixing It Up

multi_task_spec and python_callable_partials are non-exclusive, so you can mix and match configuration as needed.

Let’s build upon our yfinance example, and instead of using the default PythonOperator, let’s use the PythonVirtualenvOperator, and change the requirements for our get_amzn task.

# ---
# operator: airflow.operators.python.PythonVirtualenvOperator
# python_callable: main
# python_callable_partials:
#   get_amzn:
#     symbol: AMZN
#   get_goog:
#     symbol: GOOG
#   get_msft:
#     symbol: MSFT
# multi_task_spec:
#   get_amzn:
#     requirements:
#       - yfinance==0.1.96
# ---

def main(symbol):
  from yfinance import Ticker

  stock = Ticker(symbol)
  history = stock.history(period='1y', interval='1d').reset_index()
  history["Symbol"] = symbol
  print(history.head())

In the above example, we changed two things:

  • We are now explicitly using the PythonVirtualenvOperator via the operator entry.

  • Our get_amzn task also gets an entry in the multi_task_spec block, specifying a list of requirements just for our get_amzn task.

With both multi_task_spec and python_callable_partials working together, you can pretty much iterate over anything!