πŸŒͺ️ Apache Airflow - Production Workflow Orchestration

Apache Airflow is a powerful, open-source platform to programmatically author, schedule, and monitor workflows. It’s the industry standard for data pipeline orchestration.

Table of Contents

  1. Core Concepts
  2. Getting Started
  3. Building DAGs
  4. Operators & Tasks
  5. Advanced Features
  6. Best Practices

🎯 Core Concepts

What is Apache Airflow?

Airflow is a workflow orchestration tool that allows you to:

  • Define workflows as code - Python-based DAGs for version control
  • Schedule tasks - Cron-like scheduling with flexibility
  • Monitor execution - Beautiful UI with rich logging
  • Handle failures - Retry mechanisms and error callbacks
  • Scale horizontally - Distributed execution with Executors

Key Terminology

DAG (Directed Acyclic Graph): A collection of tasks organized with dependencies
Task: A unit of work (operator instance)
Operator: A Python class that defines what a task does
Task Instance: A specific run of a task for a given DAG run
Executor: What runs tasks (Local, Celery, Kubernetes, etc.)


πŸš€ Getting Started

Installation and Setup

# Install Airflow
pip install apache-airflow==2.6.0

# Set AIRFLOW_HOME environment variable
export AIRFLOW_HOME=~/airflow

# Initialize the database
airflow db init

# Create admin user
airflow users create \
    --username admin \
    --firstname Admin \
    --lastname User \
    --role Admin \
    --email admin@example.com

# Start the web server
airflow webserver --port 8080

# Start the scheduler (in another terminal)
airflow scheduler

Basic Directory Structure

airflow_home/
β”œβ”€β”€ dags/                 # DAG definitions
β”œβ”€β”€ logs/                 # Task logs
β”œβ”€β”€ plugins/              # Custom operators/hooks
β”œβ”€β”€ airflow.cfg          # Configuration file
└── airflow.db           # SQLite metadata database

πŸ”§ Building DAGs

Simple DAG Example

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

# Define default arguments
default_args = {
    'owner': 'data-team',
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'start_date': datetime(2024, 1, 1),
    'email': ['alert@example.com'],
    'email_on_failure': True,
}

# Create DAG
dag = DAG(
    'simple_etl_pipeline',
    default_args=default_args,
    description='A simple ETL pipeline',
    schedule_interval='@daily',  # Run daily
    catchup=False,               # Don't backfill
)

# Define Python functions
def extract_data():
    print("Extracting data from API...")
    return {"rows": 1000}

def transform_data(**context):
    ti = context['task_instance']
    data = ti.xcom_pull(task_ids='extract')
    print(f"Transforming {data['rows']} rows...")
    return f"Transformed {data['rows']} rows"

# Define tasks
extract_task = PythonOperator(
    task_id='extract',
    python_callable=extract_data,
    dag=dag,
)

transform_task = PythonOperator(
    task_id='transform',
    python_callable=transform_data,
    provide_context=True,
    dag=dag,
)

load_task = BashOperator(
    task_id='load',
    bash_command='echo "Loading data into warehouse..."',
    dag=dag,
)

# Set dependencies
extract_task >> transform_task >> load_task

DAG with Error Handling

def task_failure_callback(context):
    """Called when a task fails"""
    task_instance = context['task_instance']
    print(f"Task {task_instance.task_id} failed!")
    # Send alert, log error, etc.

def task_success_callback(context):
    """Called when a task succeeds"""
    task_instance = context['task_instance']
    print(f"Task {task_instance.task_id} succeeded!")

task = PythonOperator(
    task_id='risky_task',
    python_callable=risky_function,
    on_failure_callback=task_failure_callback,
    on_success_callback=task_success_callback,
    dag=dag,
)

πŸ“¦ Operators & Tasks

Common Operators

from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.email import EmailOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.dummy import DummyOperator

# Python Operator
python_task = PythonOperator(
    task_id='python_task',
    python_callable=my_function,
)

# Bash Operator
bash_task = BashOperator(
    task_id='bash_task',
    bash_command='python /scripts/etl.py',
)

# Email Operator
email_task = EmailOperator(
    task_id='send_email',
    to='user@example.com',
    subject='Pipeline Complete',
    html_content='<p>ETL pipeline completed successfully</p>',
)

# Trigger another DAG
trigger_task = TriggerDagRunOperator(
    task_id='trigger_downstream',
    trigger_dag_id='downstream_dag_id',
)

Working with XCom (Cross Communication)

def extract_task():
    return [1, 2, 3, 4, 5]

def transform_task(**context):
    # Pull data from upstream task
    data = context['task_instance'].xcom_pull(task_ids='extract')
    print(f"Received data: {data}")
    # Push data for downstream tasks
    return sum(data)

extract = PythonOperator(
    task_id='extract',
    python_callable=extract_task,
    dag=dag,
)

transform = PythonOperator(
    task_id='transform',
    python_callable=transform_task,
    provide_context=True,
    dag=dag,
)

extract >> transform

🎯 Advanced Features

Branching Logic

from airflow.operators.python import BranchPythonOperator

def decide_branch(**context):
    if some_condition:
        return 'task_a'
    else:
        return 'task_b'

branch_task = BranchPythonOperator(
    task_id='branch',
    python_callable=decide_branch,
    provide_context=True,
    dag=dag,
)

task_a = PythonOperator(task_id='task_a', ...)
task_b = PythonOperator(task_id='task_b', ...)
end_task = DummyOperator(task_id='end')

branch_task >> [task_a, task_b] >> end_task

Dynamic Task Generation

from airflow.models import Variable

# Get list of tables to process
tables = ['users', 'orders', 'products']

# Create tasks dynamically
for table in tables:
    process_table = PythonOperator(
        task_id=f'process_{table}',
        python_callable=process_table_func,
        op_kwargs={'table': table},
        dag=dag,
    )

    extract_task >> process_table >> load_task

Passing Configuration

# DAG with parameters
dag = DAG(
    'configurable_dag',
    params={
        'environment': 'production',
        'job_id': 123,
    },
)

def run_job(**context):
    env = context['params']['environment']
    job_id = context['params']['job_id']
    print(f"Running job {job_id} in {env}")

βœ… Best Practices

1. Idempotent Tasks

Every task should produce the same result when run multiple times.

def idempotent_load(**context):
    # Use MERGE or DELETE+INSERT instead of direct INSERT
    # or write to a timestamped partition
    execution_date = context['execution_date']
    partition = execution_date.strftime('%Y%m%d')
    # Load to /data/2024/01/15/ (idempotent)

2. Proper Error Handling

from airflow.exceptions import AirflowException

def robust_task():
    try:
        result = perform_operation()
    except SpecificError as e:
        # Handle expected errors
        return None
    except Exception as e:
        raise AirflowException(f"Unexpected error: {str(e)}")
    return result

3. Use Sensors for Waiting

from airflow.sensors.filesystem import FileSensor
from airflow.sensors.sql import SqlSensor

# Wait for file
wait_for_file = FileSensor(
    task_id='wait_for_file',
    filepath='/data/input.csv',
    poke_interval=60,
    timeout=300,
)

# Wait for query result
wait_for_data = SqlSensor(
    task_id='wait_for_data',
    conn_id='postgres_default',
    sql='SELECT COUNT(*) FROM table WHERE date = ',
    allowed_states=['success'],
)

4. Implement Logging

import logging

logger = logging.getLogger(__name__)

def log_task():
    logger.info("Starting task execution")
    logger.error("An error occurred")
    logger.warning("Be careful!")

5. Test Your DAGs Locally

# Validate DAG syntax
airflow dags validate

# Test a specific task
airflow tasks test my_dag my_task 2024-01-01

# Dry run (tests task without executing)
airflow dags test my_dag 2024-01-01

πŸ“š Resources

  • Official Documentation: airflow.apache.org
  • Airflow Tutorial: Comprehensive guide on Apache Airflow
  • Community: Active Slack channel and GitHub discussions
  • Deployment: Helm charts for Kubernetes deployment

Last updated: April 12, 2026
Difficulty: Intermediate to Advanced
Prerequisites: Python, understanding of ETL concepts