apache-airflow-orchestration
Complete guide for Apache Airflow orchestration including DAGs, operators, sensors, XComs, task dependencies, dynamic workflows, and production deployment
What this skill does
# Apache Airflow Orchestration
A comprehensive skill for mastering Apache Airflow workflow orchestration. This skill covers DAG development, operators, sensors, task dependencies, dynamic workflows, XCom communication, scheduling patterns, and production deployment strategies.
## When to Use This Skill
Use this skill when:
- Building and managing complex data pipelines with task dependencies
- Orchestrating ETL/ELT workflows across multiple systems
- Scheduling and monitoring batch processing jobs
- Coordinating multi-step data transformations
- Managing workflows with conditional execution and branching
- Implementing event-driven or asset-based workflows
- Deploying production-grade workflow automation
- Creating dynamic workflows that generate tasks programmatically
- Coordinating distributed task execution across clusters
- Building data engineering platforms with workflow orchestration
## Core Concepts
### What is Apache Airflow?
Apache Airflow is an open-source platform for programmatically authoring, scheduling, and monitoring workflows. It allows you to define workflows as Directed Acyclic Graphs (DAGs) using Python code, making complex workflow orchestration maintainable and version-controlled.
**Key Principles:**
- **Dynamic**: Workflows are defined in Python, enabling dynamic generation
- **Extensible**: Rich ecosystem of operators, sensors, and hooks
- **Scalable**: Can scale from single machine to large clusters
- **Observable**: Comprehensive UI for monitoring and troubleshooting
### DAGs (Directed Acyclic Graphs)
A DAG is a collection of tasks organized to reflect their relationships and dependencies.
**DAG Properties:**
- **dag_id**: Unique identifier for the DAG
- **start_date**: When the DAG should start being scheduled
- **schedule**: How often to run (cron, timedelta, or asset-based)
- **catchup**: Whether to run missed intervals on DAG activation
- **tags**: Labels for organization and filtering
- **default_args**: Default parameters for all tasks in the DAG
**DAG Definition Example:**
```python
from datetime import datetime
from airflow.sdk import DAG
with DAG(
dag_id="example_dag",
start_date=datetime(2022, 1, 1),
schedule="0 0 * * *", # Daily at midnight
catchup=False,
tags=["example", "tutorial"],
) as dag:
# Tasks defined here
pass
```
### Tasks and Operators
**Tasks** are the basic units of execution in Airflow. **Operators** are templates for creating tasks.
**Common Operator Types:**
1. **BashOperator**: Execute bash commands
2. **PythonOperator**: Execute Python functions
3. **EmailOperator**: Send emails
4. **EmptyOperator**: Placeholder/dummy tasks
5. **Custom Operators**: User-defined operators for specific needs
**Operator vs. Task:**
- Operator: Template/class definition
- Task: Instantiation of an operator with specific parameters
### Task Dependencies
Task dependencies define the execution order and workflow structure.
**Dependency Operators:**
- `>>`: Sets downstream dependency (task1 >> task2)
- `<<`: Sets upstream dependency (task2 << task1)
- `chain()`: Sequential dependencies for multiple tasks
- `cross_downstream()`: Many-to-many relationships
**Dependency Examples:**
```python
# Simple linear flow
task1 >> task2 >> task3
# Fan-out pattern
task1 >> [task2, task3, task4]
# Fan-in pattern
[task1, task2, task3] >> task4
# Complex dependencies
first_task >> [second_task, third_task]
third_task << fourth_task
```
### Executors
Executors determine how and where tasks run.
**Executor Types:**
- **SequentialExecutor**: Single-threaded, local (default, not for production)
- **LocalExecutor**: Multi-threaded, single machine
- **CeleryExecutor**: Distributed execution using Celery
- **KubernetesExecutor**: Each task runs in a separate Kubernetes pod
- **DaskExecutor**: Distributed execution using Dask
### Scheduler
The Airflow scheduler:
- Monitors all DAGs and their tasks
- Triggers task instances based on dependencies and schedules
- Submits tasks to executors for execution
- Handles retries and task state management
**Starting the Scheduler:**
```bash
airflow scheduler
```
## DAG Development Patterns
### Basic DAG Structure
Every DAG follows this structure:
```python
from datetime import datetime
from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator
with DAG(
dag_id="basic_dag",
start_date=datetime(2022, 1, 1),
schedule="0 0 * * *",
catchup=False,
) as dag:
task1 = BashOperator(
task_id="task1",
bash_command="echo 'Task 1 executed'"
)
task2 = BashOperator(
task_id="task2",
bash_command="echo 'Task 2 executed'"
)
task1 >> task2
```
### Task Dependencies and Chains
**Linear Chain:**
```python
from airflow.sdk import chain
# These are equivalent:
task1 >> task2 >> task3 >> task4
chain(task1, task2, task3, task4)
```
**Dynamic Chain:**
```python
from airflow.sdk import chain
from airflow.operators.empty import EmptyOperator
# Dynamically generate and chain tasks
chain(*[EmptyOperator(task_id=f"task_{i}") for i in range(1, 6)])
```
**Pairwise Chain:**
```python
from airflow.sdk import chain
# Creates paired dependencies:
# op1 >> op2 >> op4 >> op6
# op1 >> op3 >> op5 >> op6
chain(op1, [op2, op3], [op4, op5], op6)
```
**Cross Downstream:**
```python
from airflow.sdk import cross_downstream
# Both op1 and op2 feed into both op3 and op4
cross_downstream([op1, op2], [op3, op4])
```
### Branching and Conditional Execution
**BranchPythonOperator:**
```python
from airflow.operators.python import BranchPythonOperator
def choose_branch(**context):
if context['data_interval_start'].day == 1:
return 'monthly_task'
return 'daily_task'
branch = BranchPythonOperator(
task_id='branch_task',
python_callable=choose_branch
)
daily_task = BashOperator(task_id='daily_task', bash_command='echo daily')
monthly_task = BashOperator(task_id='monthly_task', bash_command='echo monthly')
branch >> [daily_task, monthly_task]
```
**Custom Branch Operator:**
```python
from airflow.operators.branch import BaseBranchOperator
class MyBranchOperator(BaseBranchOperator):
def choose_branch(self, context):
"""
Run extra branch on first day of month
"""
if context['data_interval_start'].day == 1:
return ['daily_task_id', 'monthly_task_id']
elif context['data_interval_start'].day == 2:
return 'daily_task_id'
else:
return None # Skip all downstream tasks
```
### TaskGroups for Organization
TaskGroups help organize related tasks hierarchically:
```python
from airflow.sdk import task_group
from airflow.operators.empty import EmptyOperator
@task_group()
def data_processing_group():
extract = EmptyOperator(task_id="extract")
transform = EmptyOperator(task_id="transform")
load = EmptyOperator(task_id="load")
extract >> transform >> load
@task_group()
def validation_group():
validate_schema = EmptyOperator(task_id="validate_schema")
validate_data = EmptyOperator(task_id="validate_data")
validate_schema >> validate_data
start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")
start >> data_processing_group() >> validation_group() >> end
```
### Edge Labeling
Add labels to dependency edges for clarity:
```python
from airflow.sdk import Label
# Inline labeling
my_task >> Label("When empty") >> other_task
# Method-based labeling
my_task.set_downstream(other_task, Label("When empty"))
```
### LatestOnlyOperator
Skip tasks if not the latest DAG run:
```python
from airflow.operators.latest_only import LatestOnlyOperator
from airflow.operators.empty import EmptyOperator
import pendulum
with DAG(
dag_id='latest_only_example',
start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
catchup=True,
schedule="@daily",
) as dag:
latest_only = LatestOnlyOperator(task_id='latest_only')
task1 = EmptRelated in Productivity
gitea-workflow
IncludedOrchestrate agile development workflows for Gitea repositories using the tea CLI. Use when working with Gitea-hosted repos and asking to 'run the workflow', 'continue working', 'what's next', 'complete the task cycle', 'start my day', 'end the sprint', 'implement the next task', or wanting guided step-by-step development assistance. Keywords: workflow, orchestrate, agile, task cycle, sprint, daily, implement, review, PR, standup, retrospective, gitea, tea.
microsoft-graph-gateway
IncludedRoute Microsoft Graph work in this workspace. Use when users want to read or write Outlook mail, calendar events, contacts, OneDrive or SharePoint files, Teams, Planner, To Do, users, groups, directory data, or arbitrary Microsoft Graph endpoints from VS Code. Prefer WorkIQ for common read scenarios. Use Microsoft Graph for write actions and gap-read scenarios that need exact Graph properties, filters, permissions, or endpoints.
copilotkit
IncludedUse when building with CopilotKit — setup, development, integrations, debugging, upgrading, or contributing. Routes to the appropriate specialized skill based on the task.
wordly-wisdom
IncludedProvides calibrated decision analysis using Charlie Munger-style multiple mental models, inversion, incentive mapping, circle-of-competence checks, misjudgment audits, second-order effects, and forecast updates. Use when the user asks for an oracle take, a hard call, a decision memo, a premortem, an outside view, a red-team, a sanity-check, what am I missing, think this through, or wants a strategy, hire, investment, plan, product, partnership, or major life choice analysed. Avoid for simple factual lookups or time-sensitive legal, medical, or market questions without fresh evidence.
swain-session
IncludedSession management and project status dashboard. Owns the full session lifecycle (start/work/close/resume), focus lane, bookmarks, worktree detection, and tab naming. Also serves as the project status dashboard — shows active epics, progress, actionable next steps, blocked items, tasks, GitHub issues, and recommendations. Worktree creation is deferred to swain-do task dispatch (SPEC-195). Triggers on: 'session', 'status', 'what's next', 'dashboard', 'overview', 'where are we', 'what should I work on', 'show me priorities', 'bookmark', 'focus on', 'session info'.
gandi
IncludedComprehensive Gandi domain registrar integration for domain and DNS management. Register and manage domains, create/update/delete DNS records (A, AAAA, CNAME, MX, TXT, SRV, and more), configure email forwarding and aliases, check SSL certificate status, create DNS snapshots for safe rollback, bulk update zone files, and monitor domain expiration. Supports multi-domain management, zone file import/export, and automated DNS backups. Includes both read-only and destructive operations with safety controls.