7  Custom Operators

Custom operators are a great way to get even more out of gusty. Two great use cases for custom operators are:

7.1 Auto-detecting Dependencies

How It Works

  1. In gusty, the task name is the file name.

  2. gusty (optionally) makes available to your custom operators a task_id, which is the task name. Just specify task_id as an argument in the operator’s __init__ method, and gusty will pass it in when building your task.

  3. If you name your SQL tables after the task_id, you can detect table names in your SQL, which in turn can be leveraged as a list of dependencies.

  4. If you attach this list of dependencies as an attribute on your custom operator, gusty automatically wires up these dependencies for you.

Let’s make an example custom SQL operator that takes advantage of this.

Example Operator

To make our custom operator we will use:

A common purpose of SQL tasks is to create tables, so we will have our users provide SELECT statements, and will wrap their statements in a CREATE OR REPLACE TABLE statement from within the operator.

This custom operator can be stored in our Airflow plugins folder, maybe under plugins/custom_operators/__init__.py. We’ll store a function for detecting tables, detect_tables, in this file, as well, for this example.

from sql_metadata import Parser
from airflow.providers.postgres.operators.postgres import PostgresOperator

# ---------------------- #
# Detect Tables Function #
# ---------------------- #

def detect_tables(sql):
  """Detects tables in a sql query."""
  
  # Remove any Jinja syntax to improve table detection
  jinjaless_sql = sql.replace("{{", "").replace("}}", "")
  
  # Can return "schema.table", but we just want the "table"
  tables_raw = Parser(jinjaless_sql).tables
  
  # Only take "table" if "schema.table" is in tables_raw
  tables = [t.split('.')[-1] for t in tables_raw]
  
  return tables

# --------------- #
# Custom Operator #
# --------------- #

class CustomPostgresOperator(PostgresOperator):

    def __init__(
            self,
            # gusty automatically passes in task_id when creating the task
            task_id,
            schema,
            sql,
            postgres_conn_id="postgres_default",
            params=None,
            **kwargs):
        
        # gusty uses self.dependencies to create task dependencies
        self.dependencies = detect_dependencies(sql)
        
        # Always name your table after the task_id / file name
        table = task_id

        create_sql = f"CREATE OR REPLACE TABLE {schema}.{table} AS ({sql})"

        super(CustomPostgresOperator, self).__init__(
            task_id = task_id,
            sql = create_sql,
            postgres_conn_id = postgres_conn_id,
            params=params,
            **kwargs)

Example Usage

users Table

Now that we have our custom operator, we can invoke it in a Task Definition File. Let’s use this customer operator to create a users table, in a Task Definition File named users.sql.

---
operator: plugins.custom_operators.CustomPostgresOperator
schema: app_data
---

SELECT
  id AS user_id,
  created_at
FROM raw_data.users_raw

Per our custom operator, this will create a users table in our app_data schema.

new_users Table

Now we can make a second Task Definition File, new_users.sql, which references the users table.

---
operator: plugins.custom_operators.CustomPostgresOperator
schema: app_data
---

SELECT
  user_id
FROM app_data.users
WHERE DATE(created_at) = CURRENT_DATE()

In our Airflow DAG graph, the new_users task now automatically depends on the users task!

7.2 Running Notebooks

How It Works

  1. Just like task_id is a special keyword you can add to your custom operator’s __init__ method, so is file_path.

  2. file_path will be an absolute path to your Task Definition File, in this case a Jupyter Notebook.

  3. Declare a YAML cell in your Jupyter Notebook, specifying the operator that should run the cell.

  4. Have your custom operator run the cell.

Example Operator

We’ll more or less take this example directly from the gusty demo.

To make our custom operator we will use:

  • The built-in BashOperator, for running the command that renders the notebook.

  • The nbconvert package to render the notebook.

Our operator will simply render the notebook as HTML and then delete it.

from airflow.operators.bash_operator import BashOperator

command_template = """
jupyter nbconvert --to html --execute {file_path} || exit 1; rm -f {rendered_output}
"""

class JupyterOperator(BashOperator):
    """
    The LocalJupyterOperator executes the Jupyter Notebook.
    Note that it is up to the notebook itself to handle connecting
    to a database. (But it can grab this from Airflow connections)
    """

    def __init__(
      self,
      # gusty automatically passes in file_path when creating the task
      file_path,
      *args,
      **kwargs):
      
        self.file_path = file_path
        self.rendered_output = self.file_path.replace('.ipynb', '.html')

        command = command_template.format(file_path = self.file_path,
                                          rendered_output = self.rendered_output)

        super(JupyterOperator, self).__init__(bash_command = command, *args, **kwargs)

Example Usage

See the Juypter Notebook Task Definition File example in the gusty demo, in the stock_predictions DAG. Notice how the first cell in the notebook is a YAML cell (see Raw notebook).