6 Multi-tasking
Sometimes orchestration involves repetition. For example, you might have a DAG with 3 different tasks that fetch stock data for 3 different stock symbols. To create this DAG, you’d likely use a for
loop. You can achieve this for
loop style task generation within gusty using some special frontmatter blocks:
multi_task_spec
- For iterating over arguments to be passed to an operator.python_callable_partials
- For iterating over arguments to be passed to the function assigned to apython_callable
argument.
In both multi_task_spec
and python_callable_partials
, the keys below each block will be the task id for a given task, and the arguments below each task id will be passed to that operator or callable, respectively.
We’ll look at examples of each below.
6.1 multi_task_spec
Imagine we want three BashOperator
tasks to echo “hi”, “hey”, and “hello world”.
We can define all three tasks in a single Task Definition File, which we’ll call multi_greeing.yml
. The name of the Task Definition File is arbitrary. Here are its contents:
operator: airflow.operators.bash.BashOperator
bash_command: echo $GREETING
multi_task_spec:
say_hi:
env:
GREETING: hi
say_hey:
env:
GREETING: hey
say_hello_world:
env:
GREETING: hello
bash_command: echo $GREETING world
The above Task Definition File will create three tasks: say_hi
, say_hey
, and say_hello_world
. The tasks say_hi
and say_hey
both inherit the same bash_command
, but have different env
arguments. The say_hello_world
task also contains its own env
argument, but goes a step further as to define its own bash_command
.
This multi_task_spec
produces the following graph:
This powerful syntax allows you to keep your task definitions DRY. In this example, every task has a dedicated, unique env
argument. In the case of say_hi
and say_hey
, they share a common bash_command
. In the case of say_hello_world
, it gets its own env
and bash_command
. Very flexible!
6.2 python_callable_partials
Similar to multi_task_spec
, python_callable_partials
allows you to generate multiple tasks in a single file, except instead of passing arguments to an operator, you pass arguments directly to a python_callable
.
In the example Task Definition File below, we’ll create a few tasks to fetch the past year’s stock data from yfinance for three different stock symbols: AMZN
, GOOG
, and MSFT
.
# ---
# python_callable: main
# python_callable_partials:
# get_amzn:
# symbol: AMZN
# get_goog:
# symbol: GOOG
# get_msft:
# symbol: MSFT
# ---
def main(symbol):
from yfinance import Ticker
= Ticker(symbol)
stock = stock.history(period='1y', interval='1d').reset_index()
history "Symbol"] = symbol
history[print(history.head())
The above Task Definition File will create three tasks: get_amzn
, get_goog
, and get_msft
. Each task will have its respective symbol
passed to the the main
function.
6.3 Mixing It Up
multi_task_spec
and python_callable_partials
are non-exclusive, so you can mix and match configuration as needed.
Let’s build upon our yfinance
example, and instead of using the default PythonOperator
, let’s use the PythonVirtualenvOperator, and change the requirements for our get_amzn
task.
# ---
# operator: airflow.operators.python.PythonVirtualenvOperator
# python_callable: main
# python_callable_partials:
# get_amzn:
# symbol: AMZN
# get_goog:
# symbol: GOOG
# get_msft:
# symbol: MSFT
# multi_task_spec:
# get_amzn:
# requirements:
# - yfinance==0.1.96
# ---
def main(symbol):
from yfinance import Ticker
= Ticker(symbol)
stock = stock.history(period='1y', interval='1d').reset_index()
history "Symbol"] = symbol
history[print(history.head())
In the above example, we changed two things:
We are now explicitly using the
PythonVirtualenvOperator
via theoperator
entry.Our
get_amzn
task also gets an entry in themulti_task_spec
block, specifying a list ofrequirements
just for ourget_amzn
task.
With both multi_task_spec
and python_callable_partials
working together, you can pretty much iterate over anything!