7 Custom Operators
Custom operators are a great way to get even more out of gusty. Two great use cases for custom operators are:
Auto-detecting dependencies - Have your tasks depend on one another without having to explicity set a
dependencies
block. This is very useful for SQL tasks, and can allow you to achieve a dbt-like dependency graph.Running notebooks - Just take a notebook and run it as a pipeline task. It’s pretty much that simple!
7.1 Auto-detecting Dependencies
How It Works
In gusty, the task name is the file name.
gusty (optionally) makes available to your custom operators a
task_id
, which is the task name. Just specifytask_id
as an argument in the operator’s__init__
method, and gusty will pass it in when building your task.If you name your SQL tables after the
task_id
, you can detect table names in your SQL, which in turn can be leveraged as a list of dependencies.If you attach this list of dependencies as an attribute on your custom operator, gusty automatically wires up these dependencies for you.
Let’s make an example custom SQL operator that takes advantage of this.
Example Operator
To make our custom operator we will use:
The
PostgresOperator
from Airflow’s Postgres Provider, as the parent class for our custom operator.The Parser from the sql-metadata package, for detecting table names.
A common purpose of SQL tasks is to create tables, so we will have our users provide SELECT
statements, and will wrap their statements in a CREATE OR REPLACE TABLE
statement from within the operator.
This custom operator can be stored in our Airflow plugins folder, maybe under plugins/custom_operators/__init__.py
. We’ll store a function for detecting tables, detect_tables
, in this file, as well, for this example.
from sql_metadata import Parser
from airflow.providers.postgres.operators.postgres import PostgresOperator
# ---------------------- #
# Detect Tables Function #
# ---------------------- #
def detect_tables(sql):
"""Detects tables in a sql query."""
# Remove any Jinja syntax to improve table detection
= sql.replace("{{", "").replace("}}", "")
jinjaless_sql
# Can return "schema.table", but we just want the "table"
= Parser(jinjaless_sql).tables
tables_raw
# Only take "table" if "schema.table" is in tables_raw
= [t.split('.')[-1] for t in tables_raw]
tables
return tables
# --------------- #
# Custom Operator #
# --------------- #
class CustomPostgresOperator(PostgresOperator):
def __init__(
self,
# gusty automatically passes in task_id when creating the task
task_id,
schema,
sql,="postgres_default",
postgres_conn_id=None,
params**kwargs):
# gusty uses self.dependencies to create task dependencies
self.dependencies = detect_dependencies(sql)
# Always name your table after the task_id / file name
= task_id
table
= f"CREATE OR REPLACE TABLE {schema}.{table} AS ({sql})"
create_sql
super(CustomPostgresOperator, self).__init__(
= task_id,
task_id = create_sql,
sql = postgres_conn_id,
postgres_conn_id =params,
params**kwargs)
Example Usage
users
Table
Now that we have our custom operator, we can invoke it in a Task Definition File. Let’s use this customer operator to create a users table, in a Task Definition File named users.sql
.
---
operator: plugins.custom_operators.CustomPostgresOperator
schema: app_data
---
SELECT
id AS user_id,
created_atFROM raw_data.users_raw
Per our custom operator, this will create a users
table in our app_data
schema.
new_users
Table
Now we can make a second Task Definition File, new_users.sql
, which references the users
table.
---
operator: plugins.custom_operators.CustomPostgresOperator
schema: app_data
---
SELECT
user_idFROM app_data.users
WHERE DATE(created_at) = CURRENT_DATE()
In our Airflow DAG graph, the new_users
task now automatically depends on the users
task!
7.2 Running Notebooks
How It Works
Just like
task_id
is a special keyword you can add to your custom operator’s__init__
method, so isfile_path
.file_path
will be an absolute path to your Task Definition File, in this case a Jupyter Notebook.Declare a YAML cell in your Jupyter Notebook, specifying the
operator
that should run the cell.Have your custom operator run the cell.
Example Operator
We’ll more or less take this example directly from the gusty demo.
To make our custom operator we will use:
The built-in BashOperator, for running the command that renders the notebook.
The nbconvert package to render the notebook.
Our operator will simply render the notebook as HTML and then delete it.
from airflow.operators.bash_operator import BashOperator
= """
command_template jupyter nbconvert --to html --execute {file_path} || exit 1; rm -f {rendered_output}
"""
class JupyterOperator(BashOperator):
"""
The LocalJupyterOperator executes the Jupyter Notebook.
Note that it is up to the notebook itself to handle connecting
to a database. (But it can grab this from Airflow connections)
"""
def __init__(
self,
# gusty automatically passes in file_path when creating the task
file_path,*args,
**kwargs):
self.file_path = file_path
self.rendered_output = self.file_path.replace('.ipynb', '.html')
= command_template.format(file_path = self.file_path,
command = self.rendered_output)
rendered_output
super(JupyterOperator, self).__init__(bash_command = command, *args, **kwargs)
Example Usage
See the Juypter Notebook Task Definition File example in the gusty demo, in the stock_predictions DAG. Notice how the first cell in the notebook is a YAML cell (see Raw notebook).