Claude
Skills
Sign in
Back

apache-airflow-orchestration

Included with Lifetime
$97 forever

Complete guide for Apache Airflow orchestration including DAGs, operators, sensors, XComs, task dependencies, dynamic workflows, and production deployment

Productivityairflowworkfloworchestrationdagsoperatorssensorsdata-pipelinesscheduling

What this skill does


# Apache Airflow Orchestration

A comprehensive skill for mastering Apache Airflow workflow orchestration. This skill covers DAG development, operators, sensors, task dependencies, dynamic workflows, XCom communication, scheduling patterns, and production deployment strategies.

## When to Use This Skill

Use this skill when:

- Building and managing complex data pipelines with task dependencies
- Orchestrating ETL/ELT workflows across multiple systems
- Scheduling and monitoring batch processing jobs
- Coordinating multi-step data transformations
- Managing workflows with conditional execution and branching
- Implementing event-driven or asset-based workflows
- Deploying production-grade workflow automation
- Creating dynamic workflows that generate tasks programmatically
- Coordinating distributed task execution across clusters
- Building data engineering platforms with workflow orchestration

## Core Concepts

### What is Apache Airflow?

Apache Airflow is an open-source platform for programmatically authoring, scheduling, and monitoring workflows. It allows you to define workflows as Directed Acyclic Graphs (DAGs) using Python code, making complex workflow orchestration maintainable and version-controlled.

**Key Principles:**
- **Dynamic**: Workflows are defined in Python, enabling dynamic generation
- **Extensible**: Rich ecosystem of operators, sensors, and hooks
- **Scalable**: Can scale from single machine to large clusters
- **Observable**: Comprehensive UI for monitoring and troubleshooting

### DAGs (Directed Acyclic Graphs)

A DAG is a collection of tasks organized to reflect their relationships and dependencies.

**DAG Properties:**
- **dag_id**: Unique identifier for the DAG
- **start_date**: When the DAG should start being scheduled
- **schedule**: How often to run (cron, timedelta, or asset-based)
- **catchup**: Whether to run missed intervals on DAG activation
- **tags**: Labels for organization and filtering
- **default_args**: Default parameters for all tasks in the DAG

**DAG Definition Example:**
```python
from datetime import datetime
from airflow.sdk import DAG

with DAG(
    dag_id="example_dag",
    start_date=datetime(2022, 1, 1),
    schedule="0 0 * * *",  # Daily at midnight
    catchup=False,
    tags=["example", "tutorial"],
) as dag:
    # Tasks defined here
    pass
```

### Tasks and Operators

**Tasks** are the basic units of execution in Airflow. **Operators** are templates for creating tasks.

**Common Operator Types:**

1. **BashOperator**: Execute bash commands
2. **PythonOperator**: Execute Python functions
3. **EmailOperator**: Send emails
4. **EmptyOperator**: Placeholder/dummy tasks
5. **Custom Operators**: User-defined operators for specific needs

**Operator vs. Task:**
- Operator: Template/class definition
- Task: Instantiation of an operator with specific parameters

### Task Dependencies

Task dependencies define the execution order and workflow structure.

**Dependency Operators:**
- `>>`: Sets downstream dependency (task1 >> task2)
- `<<`: Sets upstream dependency (task2 << task1)
- `chain()`: Sequential dependencies for multiple tasks
- `cross_downstream()`: Many-to-many relationships

**Dependency Examples:**
```python
# Simple linear flow
task1 >> task2 >> task3

# Fan-out pattern
task1 >> [task2, task3, task4]

# Fan-in pattern
[task1, task2, task3] >> task4

# Complex dependencies
first_task >> [second_task, third_task]
third_task << fourth_task
```

### Executors

Executors determine how and where tasks run.

**Executor Types:**
- **SequentialExecutor**: Single-threaded, local (default, not for production)
- **LocalExecutor**: Multi-threaded, single machine
- **CeleryExecutor**: Distributed execution using Celery
- **KubernetesExecutor**: Each task runs in a separate Kubernetes pod
- **DaskExecutor**: Distributed execution using Dask

### Scheduler

The Airflow scheduler:
- Monitors all DAGs and their tasks
- Triggers task instances based on dependencies and schedules
- Submits tasks to executors for execution
- Handles retries and task state management

**Starting the Scheduler:**
```bash
airflow scheduler
```

## DAG Development Patterns

### Basic DAG Structure

Every DAG follows this structure:

```python
from datetime import datetime
from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator

with DAG(
    dag_id="basic_dag",
    start_date=datetime(2022, 1, 1),
    schedule="0 0 * * *",
    catchup=False,
) as dag:

    task1 = BashOperator(
        task_id="task1",
        bash_command="echo 'Task 1 executed'"
    )

    task2 = BashOperator(
        task_id="task2",
        bash_command="echo 'Task 2 executed'"
    )

    task1 >> task2
```

### Task Dependencies and Chains

**Linear Chain:**
```python
from airflow.sdk import chain

# These are equivalent:
task1 >> task2 >> task3 >> task4
chain(task1, task2, task3, task4)
```

**Dynamic Chain:**
```python
from airflow.sdk import chain
from airflow.operators.empty import EmptyOperator

# Dynamically generate and chain tasks
chain(*[EmptyOperator(task_id=f"task_{i}") for i in range(1, 6)])
```

**Pairwise Chain:**
```python
from airflow.sdk import chain

# Creates paired dependencies:
# op1 >> op2 >> op4 >> op6
# op1 >> op3 >> op5 >> op6
chain(op1, [op2, op3], [op4, op5], op6)
```

**Cross Downstream:**
```python
from airflow.sdk import cross_downstream

# Both op1 and op2 feed into both op3 and op4
cross_downstream([op1, op2], [op3, op4])
```

### Branching and Conditional Execution

**BranchPythonOperator:**
```python
from airflow.operators.python import BranchPythonOperator

def choose_branch(**context):
    if context['data_interval_start'].day == 1:
        return 'monthly_task'
    return 'daily_task'

branch = BranchPythonOperator(
    task_id='branch_task',
    python_callable=choose_branch
)

daily_task = BashOperator(task_id='daily_task', bash_command='echo daily')
monthly_task = BashOperator(task_id='monthly_task', bash_command='echo monthly')

branch >> [daily_task, monthly_task]
```

**Custom Branch Operator:**
```python
from airflow.operators.branch import BaseBranchOperator

class MyBranchOperator(BaseBranchOperator):
    def choose_branch(self, context):
        """
        Run extra branch on first day of month
        """
        if context['data_interval_start'].day == 1:
            return ['daily_task_id', 'monthly_task_id']
        elif context['data_interval_start'].day == 2:
            return 'daily_task_id'
        else:
            return None  # Skip all downstream tasks
```

### TaskGroups for Organization

TaskGroups help organize related tasks hierarchically:

```python
from airflow.sdk import task_group
from airflow.operators.empty import EmptyOperator

@task_group()
def data_processing_group():
    extract = EmptyOperator(task_id="extract")
    transform = EmptyOperator(task_id="transform")
    load = EmptyOperator(task_id="load")

    extract >> transform >> load

@task_group()
def validation_group():
    validate_schema = EmptyOperator(task_id="validate_schema")
    validate_data = EmptyOperator(task_id="validate_data")

    validate_schema >> validate_data

start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")

start >> data_processing_group() >> validation_group() >> end
```

### Edge Labeling

Add labels to dependency edges for clarity:

```python
from airflow.sdk import Label

# Inline labeling
my_task >> Label("When empty") >> other_task

# Method-based labeling
my_task.set_downstream(other_task, Label("When empty"))
```

### LatestOnlyOperator

Skip tasks if not the latest DAG run:

```python
from airflow.operators.latest_only import LatestOnlyOperator
from airflow.operators.empty import EmptyOperator
import pendulum

with DAG(
    dag_id='latest_only_example',
    start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
    catchup=True,
    schedule="@daily",
) as dag:
    latest_only = LatestOnlyOperator(task_id='latest_only')
    task1 = Empt

Related in Productivity