Skip to main content
Apache Airflow is a workflow orchestration system that represents pipelines as Directed Acyclic Graphs of tasks. It doesn’t perform the actual compute, instead, it determines what runs, when it runs, and in what order. Each task can trigger external systems such as Spark, Trino, and databases. NexusOne uses Airflow as the control layer for data workflows, such as coordinating ingestion, transformation, or validation. NexusOne also uses Airflow for platform automation across Iceberg, Spark, and Trino. When interacting with NexusOne using the portal, you can launch Airflow using the Build feature.

DAG

As previously described, in Airflow, a DAG or Directed Acyclic Graph represents an entire workflow. It defines what tasks exist, when the workflow runs, and how it handles failures and retries. A Python file defines each DAG. The DAG doesn’t perform any computation itself, it only defines the structure and scheduling of tasks.

Where DAGs live in NexusOne

In NexusOne, Airflow loads DAGs from the /opt/airflow/dags path, while you can author DAGs in JupyterHub under the /home/<user>/dags path. Any valid Python file placed in the Airflow DAGs directory is automatically detected and scheduled.
In the JupyterHub path, user refers to your NexusOne username.

Core DAG components

Every Airflow DAG consists of a set of core components that control how and when it runs. A combination of default arguments and DAG-level parameters defines these components and controls scheduling, fault tolerance, execution behavior, and platform-level workflows. The core components include:
  • dag_id: Unique identifier for the workflow.
  • owner: Team or system responsible for the DAG.
  • retries: Number of retry attempts for failed tasks.
  • retry_delay: Time to wait between retries.
  • start_date: Date from which the DAG becomes eligible to run.
  • schedule/schedule_interval: When the DAG should run.
  • catchup: Whether to backfill past runs or only run from now onward.

DAG owner and authorization

Each DAG includes an owner field in the @dag decorator. This owner identifies the user, team, or service account under which the workflow executes.
@dag(
  dag_id="example",
  owner="data-engineering",
  start_date=datetime(2025, 1, 1),
  schedule="@daily"
)
def example_dag():
  ...
A DAG only executes if the configured owner has the necessary permissions to access the required systems, such as data sources, Spark cluster, Iceberg tables, or external services.

DAG naming conventions

To ensure clarity and maintainability, DAGs should follow a consistent naming pattern.
<feature>_<domain>_<purpose>
A few examples are:
  • ingest_banking_files.py
  • transform_sales_daily.py
  • quality_customer_checks.py
  • platform_health_canary.py
Airflow discovers DAGs from a designated directory commonly named dags/. All workflow definitions, such as Python files containing @dag or DAG objects must live within this directory. These definitions are then picked up and scheduled by the Airflow scheduler. Shared, reusable logic is typically stored in a sibling utils/ directory. Files in this utils/ directory contain common functions used across multiple DAGs, such as configuration loaders, Spark session helpers, validation logic, and API clients. A file tree sample:
dags/

├── ingestion/
├── transform/
├── quality/
├── monitoring/
└── utils/
    ├── io_helpers.py
    ├── spark_helpers.py
    └── quality_rules.py
DAG files can import functions from utils/, for example:
from utils.io_helpers import read_config
from utils.spark_helpers import build_spark_session
This separation keeps DAGs focused on orchestration logic while centralizing reusable code into shared modules.

Defining a DAG

The following examples demonstrate how to define a DAG using two approaches. In each approach, the DAG has a known owner configured, a specific start date, and a fixed schedule, such as daily or via a cron job, and an explicit retry logic to ensure consistent, fault-tolerant execution.

Approach 1: Classic DAG Method

This method uses the DAG object directly instead of decorators, but still supports the same parameters and scheduling logic.
from airflow import DAG
from datetime import datetime, timedelta

default_args = {
    "owner": "nx1",
    "retries": 2,
    "retry_delay": timedelta(minutes=5)
}

with DAG(
    dag_id="nx1_classic_dag",
    start_date=datetime(2025, 1, 1),
    schedule_interval="@daily",
    catchup=False,
    default_args=default_args
) as dag:
    pass

Approach 2: TaskFlow API using a decorator

It uses a Python decorator to define scheduling and runtime behavior in a clean and compact format. What the DAG in the following code defines:
  • DAG starts running from January 1, 2025.
  • Executes daily at 12 AM.
  • Tasks retry up to 2 times.
  • 5-minute delay between retries.
  • No backfill of historical runs.
from airflow.decorators import dag
from datetime import datetime, timedelta

@dag(
    dag_id="nx1_sample_dag",
    start_date=datetime(2025, 1, 1),
    schedule="@daily",
    catchup=False,
    default_args={
        "owner": "nx1",
        "retries": 2,
        "retry_delay": timedelta(minutes=5)
    }
)
def sample_dag():
    pass

dag = sample_dag()

Defining tasks

In Airflow, a task is a single unit of work inside a DAG, it defines what actually runs. For example, triggering a Spark job, calling an internal API, performing a data check, or moving data between systems. In NexusOne, tasks are primarily defined using the TaskFlow API with the @task decorator. The classic operators are still used where appropriate, for example, in Bash, Python, or Spark submission. The TaskFlow API also enables automatic XCom-based data passing, so that return values from one task get consumed by another without manual wiring. The following sections describe how to define tasks in different ways.

Use the TaskFlow API

When using the TaskFlow API, tasks are just Python functions decorated with @task. Airflow turns these functions into tasks and automatically wires them into the DAG. The following example describes how to use the TaskFlow API. In the code:
  • The @task decorator tells Airflow to treat them as tasks.
  • You define the DAG logic by calling the functions using a variable. For example, cfg = read_config().
from airflow.decorators import dag, task
from datetime import datetime

@dag(
    dag_id="nx1_unified_taskflow",
    start_date=datetime(2025, 1, 1),
    schedule="0 2 * * *",   # Daily at 2 AM
    catchup=False
)
def nx1_pipeline():

    @task
    def read_config() -> dict:
        return {
            "source": "s3://nx1-source/data.csv",
            "target": "iceberg.prod.sample_table",
            "mode": "overwrite"
        }

    @task
    def run_job(config: dict) -> str:
        print(f"Running job with config: {config}")
        return "SUCCESS"

    @task
    def update_status(status: str):
        print(f"Workflow finished with status: {status}")

    cfg = read_config()
    result = run_job(cfg)
    update_status(result)

dag = nx1_pipeline()

Pass data automatically using XCom

With @task, any return value from a task gets automatically stored in XCom and passed to downstream tasks as function arguments. You don’t need to manually call xcom_push or xcom_pull. From the example in the previous section:
  • read_config() returns a dict which is then passed as cfg into run_job(cfg).
  • run_job() returns "SUCCESS" which is then passed as result into update_status(result).
This makes it easy to pass data between tasks, such as:
  • Configuration dictionaries
  • Table or object names
  • Mode flags and status values

Use operators

Airflow provides operators with prebuilt task types designed to execute specific actions. This includes running shell commands, submitting Spark jobs, calling APIs, or triggering other DAGs. Operators define how a task runs, while Airflow manages when it runs. Commonly used operators are:
  • BashOperator: Executes shell commands or scripts.
  • PythonOperator: Runs a Python callable.
  • HttpOperator: Calls internal or external APIs.
  • EmailOperator: Sends notification emails.
The following example shows how to use Python and Bash operators to:
  1. Run a Python function to execute a data job.
  2. Run a shell command to archive logs after the job completes.
from airflow.decorators import dag
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

def run_data_job():
    # Placeholder: Real logic could trigger Spark, call an API, and so on.
    print("Running NX1 data job...")

@dag(
    dag_id="nx1_sample",
    start_date=datetime(2025, 1, 1),
    schedule="0 2 * * *",   # Daily at 2 AM
    catchup=False,
    default_args={
        "owner": "nx1",
        "retries": 2,
        "retry_delay": timedelta(minutes=5),
    }
)
def nx1_operator_pipeline():

    run_job = PythonOperator(
        task_id="run_data_job",
        python_callable=run_data_job,
    )

    archive_logs = BashOperator(
        task_id="archive_logs",
        bash_command=(
            "mkdir -p /var/log/nx1/archive && "
            "mv /var/log/nx1/*.log /var/log/nx1/archive/ 2>/dev/null || true"
        ),
    )

    # Execution order: Run the job, then archive its logs.
    run_job >> archive_logs

dag = nx1_operator_pipeline()
Core code explanation:
  • run_job is a variable holding a PythonOperator task that executes the run_data_job function.
  • archive_logs is a variable holding a BashOperator task that defines how to move log files after the job runs.
  • The dependency run_job >> archive_logs ensures that the run_job task runs before archive_logs.

Use @task.pyspark

The @task.pyspark decorator executes Spark code as a native Airflow task. When used inside a @dag decorator-defined workflow, Airflow automatically provisions a Spark driver. Airflow then attaches the driver to the configured cluster runtime and injects an active Spark session into the task at runtime. This removes the need to explicitly configure or manage a Spark connection inside the DAG file. The task author only needs to focus on the transformation logic, while the underlying environment handles cluster configuration, authentication, and resource allocation. The task function receives:
  • spark: An initialized SparkSession
  • sc: The SparkContext. It’s optional.
  • *context: The standard Airflow runtime context.
For example, in the following code:
  • The process_data function executes as a Spark app instead of a regular Python task.
  • No required additional Spark connection or cluster configuration in the DAG.
  • The environment provides the Spark session automatically.
  • The configured catalog reads or writes data to Iceberg tables.
from airflow.decorators import dag, task
from datetime import datetime

@dag(
    dag_id="spark_task_example",
    start_date=datetime(2025, 1, 1),
    schedule="@daily",
    catchup=False
)
def spark_pipeline():

    @task.pyspark
    def process_data(spark, **context):

        df = spark.read.table("iceberg.prod.orders")

        df_clean = df.filter("status = 'complete'")

        df_clean.writeTo("iceberg.prod.orders_clean") \
                .overwrite()

    process_data()

dag = spark_pipeline()
This is a recommended approach for all production Spark workloads orchestrated by Airflow.

Schedule, trigger, and task dependency

Airflow uses a schedule to control when it creates a new DAG run. It uses a trigger to create a new DAG run immediately, this bypasses the schedule. Lastly, it uses task dependencies inside the DAG to define how the DAG executes within Airflow. The following sections describe how to use each feature.

Schedule

The schedule controls when the DAG runs, regardless of whether it’s defined in the TaskFlow API or with the classic DAG method. Ways to express schedules:
  • Presets: @daily, @hourly, @weekly
  • Cron expressions:
    • 0 2 * * * - daily at 2 AM
    • /15 * * * * - every 15 minutes
In the following example, the DAG runs every day at 2 AM and executes a single task defined inside the nx1_scheduled_pipeline function.
@dag(
    dag_id="nx1_scheduled_example",
    start_date=datetime(2025, 1, 1),
    schedule="0 2 * * *",   # Runs daily at 2 AM
    catchup=False
)
def nx1_scheduled_pipeline():
    @task
    def job():
        print("Running scheduled job at 2 AM")

    job()

dag = nx1_scheduled_pipeline()
There are a few supported schedule formats in Airflow. Airflow supports both preset schedules and cron expressions. You can use them in both the @dag decorator and the classic DAG class.
Format typeExample valueContext
Preset@hourlyRuns once every hour
Preset@dailyRuns once per day by midnight
Preset@weeklyRuns once per week
Preset@monthlyRuns once per month
Preset@yearly/@annuallyRuns once per year
Preset@onceRuns only one time
Cron expressionContext
0 2 * * *Daily at 2:00 AM
30 1 * * *Daily at 1:30 AM
0 */6 * * *Every 6 hours
0 9 * * 1Every Monday at 9:00 AM
15 3 1 * *First day of every month at 3:15 AM
Airflow defines schedules using a five-field cron format minute hour day-of-month month day-of-week.

Trigger

There are several ways to trigger a DAG.
  • Use the Airflow UI: Navigate to the DAG you want to trigger in the Airflow web interface and click Trigger DAG. This initiates a new run of the DAG immediately.
    01-dag-lists

    A list of DAGs in the Airlow UI
  • Use the command-line tool: The airflow dags trigger command executes a specified Airflow DAG manually via the command-line tool. It must run in an environment with access to the Airflow installation and its metadata database.
    airflow dags trigger <dag_id>
    
    The location where the command runs depends on your Airflow setup:
    EnvironmentExecution methodCommand example
    Local/ServerStandard terminal accessairflow dags trigger <dag_id>
The DAG must be present in the DAGs folder, parsed by Airflow, and unpaused so it can run on its schedule or trigger manually.

Task dependency

In the TaskFlow API, Airflow creates dependencies by calling task functions. When you call a @task decorator function inside a @dag decorator, Airflow wires the tasks into a graph in the order that you call them. If a task returns a value, it’s passed as an argument to another task function. Then, Airflow automatically moves that data via XCom under the hood. In the following example, the DAG defines three tasks using the TaskFlow API:
  • read_config() produces a configuration dictionary.
  • run_job(cfg) consumes that dictionary and returns a status string.
  • update_status(result) consumes the status string to print the final outcome.
The data flow between each task via return values, which Airflow passes using XCom under the hood.
from airflow.decorators import dag, task
from datetime import datetime

@dag(
    dag_id="nx1_dependencies_example",
    start_date=datetime(2025, 1, 1),
    schedule="0 2 * * *",   # Daily at 2 AM
    catchup=False
)
def nx1_pipeline():

    @task
    def read_config() -> dict:
        return {
            "job_name": "nx1-demo",
            "target_table": "iceberg.prod.sample_table"
        }

    @task
    def run_job(config: dict) -> str:
        print(f"Running job: {config['job_name']} on {config['target_table']}")
        return "SUCCESS"

    @task
    def update_status(status: str):
        print(f"Pipeline finished with status={status}")

    cfg = read_config()
    result = run_job(cfg)
    update_status(result)

dag = nx1_pipeline()
The dependencies are automatically created by the function call chain:
read_config() > run_job(cfg) > update_status(result)

Connections, hooks, and variables

Airflow provides connections and variables to decouple credentials and configuration from the DAG code. In NexusOne, this is how DAGs securely access databases, APIs, Spark clusters, object storage, and internal services, without hardcoding sensitive values.
  • Connections: Securely store system and service credentials.
  • Hooks: Programmatic interfaces that use connections to interact with systems.
  • Variables: Store runtime configuration value that tasks can read dynamically.
Airflow can manage connections and variables through the UI, or CI/CD pipelines, and DAGs can access them programmatically. The following sections describe how to use each feature.

Use an Airflow connection in a task

Airflow connections are typically accessed through Hooks. The following is an example using a Postgres connection. Similar patterns apply to Trino, HTTP, and so on.
from airflow.decorators import dag, task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime

@dag(
    dag_id="nx1_validate_postgres",
    start_date=datetime(2025,1,1),
    schedule="@daily",
    catchup=False
)
def nx1_pg_test():

    @task
    def validate():
        hook = PostgresHook(postgres_conn_id="nx1_postgres")
        conn = hook.get_conn()
        cursor = conn.cursor()

        cursor.execute("SELECT 1")
        print(cursor.fetchone())

    validate()

dag = nx1_pg_test()

Create an Airflow variable in the UI

Follow these steps to create an Airflow variable using the UI:
  1. Open the Airflow UI.
  2. At the top menu, click Admin > Variables.
  3. Click the + icon.
  4. In the Key field, add a new record by entering a value.
  5. Click Save.
02-variable

Creating an Airflow variable in the UI

Use Airflow variables in a DAG

In the following code example, this allows the same DAG to access the nx1_source_path and nx1_target_table variables without any hardcoding.
from airflow.decorators import dag, task
from airflow.models import Variable
from datetime import datetime

@dag(
    dag_id="nx1_variables_example",
    start_date=datetime(2025, 1, 1),
    schedule="@daily",
    catchup=False
)
def nx1_variables_pipeline():

    @task
    def read_config():
        source_path = Variable.get("nx1_source_path")
        target_table = Variable.get("nx1_target_table")
        print(f"Source: {source_path}")
        print(f"Target: {target_table}")

    read_config()

dag = nx1_variables_pipeline()

Monitoring DAGs

Airflow provides full visibility into the state of each DAG and task run. In NexusOne, this is how users and platform teams does the following:
  • Debug failures
  • Verify that pipelines are running as expected
  • Track historical behavior over time.
Monitoring DAGs is primarily done through the following:
  • A task status describing the task lifecycle.
  • Logs describing the execution detail of a task.
  • A DAG run history showing the timeline of each DAG.
The following sections describe how to use each of these features.

Task status

Every task instance in Airflow has a status that indicates its current state, such as:
  • running: The task is currently running.
  • success: The task finished without errors.
  • failed: The task raised an exception or returned an error.
  • up_for_retry: The task failed, but it might retry if you configure retries or retry_delay.
  • skipped: The task was intentionally not run, usually due to a branch or condition.
  • queued: Waiting for executor resources.
Follow these steps to see the task status in a DAG’s graph.
  1. Open the Airflow UI.
  2. Click on a DAG.
  3. Click Graph
The following image describes a DAG’s graph view. Each task is color-coded, and this color represents the task’s state. Also, each rectangle represents a task instance. When you click a task instance, it shows its status, start/end time, duration, and a link to the logs.
03-dag-graph

A graph representing several tasks in a DAG.

Access logs per task

Airflow captures all stdout, stderr, and Python logging output for each task run. These logs are essential for understanding why a task failed. Follow these steps to access a DAG’s task logs:
  1. Open the Airflow UI.
  2. Click on a DAG.
  3. Click Graph and then select a task.
  4. Click Log.
04-dag-task-logs

Logs of a task in a DAG.

View a DAG run history

For each DAG run, Airflow maintains a run history, such as the following:
  • Overall DAG state. This could be in a success, failed, or running state.
  • DAG ID
  • Run ID
  • Run type
  • The start and end date of the execution.
Follow these steps to view a history of all previously executed DAGs:
  1. Open the Airflow UI.
  2. Click Browse > DAG Runs.
05-dag-run-history

DAG run history.
You should see rows of several DAGs and their run history details, displayed in columns.

Deployment workflow

Airflow treats DAGs as version-controlled workflow assets and deploys them through a controlled delivery process. You can author DAG definitions in a Git repository and push them into the Airflow environment via an automated or semi-automated deployment pipeline. This approach ensures the following:
  • There are multiple versions and traces of workflow definitions.
  • Changes are peer-reviewed and auditable.
  • Deployments are consistent across environments.
  • No manual edits in production.
The standard deployment pattern for Airflow follows this model:
Git repository > Deployment pipeline  >  Airflow DAGs folder
The high-level flow:
  1. You create DAGs and maintain them in a source-controlled repository.
  2. Changes are peer-reviewed and merged to a designated branch.
  3. A deployment pipeline validates and syncs DAG files.
  4. The pipeline places DAGs into the Airflow DAGs directory.
  5. The Airflow scheduler automatically discovers and registers the DAGs.
Because the DAG directory is continuously monitored, newly deployed DAGs become available without restarting Airflow.

Additional resources

  • For more details about Airflow, refer to the Airflow official documentation.
  • If you are using the NexusOne portal, and want to learn how to launch Airflow, refer to the Monitor page.