ml-pipeline-automation
Automate ML workflows with Airflow, Kubeflow, MLflow. Use for reproducible pipelines, retraining schedules, MLOps, or encountering task failures, dependency errors, experiment tracking issues.
What this skill does
# ML Pipeline Automation
Orchestrate end-to-end machine learning workflows from data ingestion to production deployment with production-tested Airflow, Kubeflow, and MLflow patterns.
## When to Use This Skill
Load this skill when:
- **Building ML Pipelines**: Orchestrating data → train → deploy workflows
- **Scheduling Retraining**: Setting up automated model retraining schedules
- **Experiment Tracking**: Tracking experiments, parameters, metrics across runs
- **MLOps Implementation**: Building reproducible, monitored ML infrastructure
- **Workflow Orchestration**: Managing complex multi-step ML workflows
- **Model Registry**: Managing model versions and deployment lifecycle
## Quick Start: ML Pipeline in 5 Steps
```bash
# 1. Install Airflow and MLflow (check for latest versions at time of use)
pip install apache-airflow==3.1.5 mlflow==3.7.0
# Note: These versions are current as of December 2025
# Check PyPI for latest stable releases: https://pypi.org/project/apache-airflow/
# 2. Initialize Airflow database
airflow db init
# 3. Create DAG file: dags/ml_training_pipeline.py
cat > dags/ml_training_pipeline.py << 'EOF'
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'ml-team',
'retries': 2,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'ml_training_pipeline',
default_args=default_args,
schedule_interval='@daily',
start_date=datetime(2025, 1, 1)
)
def train_model(**context):
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
X, y = load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
mlflow.set_tracking_uri('http://localhost:5000')
mlflow.set_experiment('iris-training')
with mlflow.start_run():
model = RandomForestClassifier(n_estimators=100)
model.fit(X_train, y_train)
accuracy = model.score(X_test, y_test)
mlflow.log_metric('accuracy', accuracy)
mlflow.sklearn.log_model(model, 'model')
train = PythonOperator(
task_id='train_model',
python_callable=train_model,
dag=dag
)
EOF
# 4. Start Airflow scheduler and webserver
airflow scheduler &
airflow webserver --port 8080 &
# 5. Trigger pipeline
airflow dags trigger ml_training_pipeline
# Access UI: http://localhost:8080
```
**Result**: Working ML pipeline with experiment tracking in under 5 minutes.
## Core Concepts
### Pipeline Stages
1. **Data Collection** → Fetch raw data from sources
2. **Data Validation** → Check schema, quality, distributions
3. **Feature Engineering** → Transform raw data to features
4. **Model Training** → Train with hyperparameter tuning
5. **Model Evaluation** → Validate performance on test set
6. **Model Deployment** → Push to production if metrics pass
7. **Monitoring** → Track drift, performance in production
### Orchestration Tools Comparison
| Tool | Best For | Strengths |
|------|----------|-----------|
| **Airflow** | General ML workflows | Mature, flexible, Python-native |
| **Kubeflow** | Kubernetes-native ML | Container-based, scalable |
| **MLflow** | Experiment tracking | Model registry, versioning |
| **Prefect** | Modern Python workflows | Dynamic DAGs, native caching |
| **Dagster** | Asset-oriented pipelines | Data-aware, testable |
## Basic Airflow DAG
```python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
default_args = {
'owner': 'ml-team',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': True,
'retries': 2,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'ml_training_pipeline',
default_args=default_args,
description='End-to-end ML training pipeline',
schedule_interval='@daily',
start_date=datetime(2025, 1, 1),
catchup=False
)
def validate_data(**context):
"""Validate input data quality."""
import pandas as pd
data_path = "/data/raw/latest.csv"
df = pd.read_csv(data_path)
# Validation checks
assert len(df) > 1000, f"Insufficient data: {len(df)} rows"
assert df.isnull().sum().sum() < len(df) * 0.1, "Too many nulls"
context['ti'].xcom_push(key='data_path', value=data_path)
logger.info(f"Data validation passed: {len(df)} rows")
def train_model(**context):
"""Train ML model with MLflow tracking."""
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
data_path = context['ti'].xcom_pull(key='data_path', task_ids='validate_data')
mlflow.set_tracking_uri('http://mlflow:5000')
mlflow.set_experiment('production-training')
with mlflow.start_run():
# Training logic here
model = RandomForestClassifier(n_estimators=100)
# model.fit(X, y) ...
mlflow.log_param('n_estimators', 100)
mlflow.sklearn.log_model(model, 'model')
validate = PythonOperator(
task_id='validate_data',
python_callable=validate_data,
dag=dag
)
train = PythonOperator(
task_id='train_model',
python_callable=train_model,
dag=dag
)
validate >> train
```
## Known Issues Prevention
### 1. Task Failures Without Alerts
**Problem**: Pipeline fails silently, no one notices until users complain.
**Solution**: Configure email/Slack alerts on failure:
```python
default_args = {
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': False
}
def on_failure_callback(context):
"""Send Slack alert on failure."""
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
slack_msg = f"""
:red_circle: Task Failed: {context['task_instance'].task_id}
DAG: {context['task_instance'].dag_id}
Execution Date: {context['ds']}
Error: {context.get('exception')}
"""
SlackWebhookOperator(
task_id='slack_alert',
slack_webhook_conn_id='slack_webhook',
message=slack_msg
).execute(context)
task = PythonOperator(
task_id='critical_task',
python_callable=my_function,
on_failure_callback=on_failure_callback,
dag=dag
)
```
### 2. Missing XCom Data Between Tasks
**Problem**: Task expects XCom value from previous task, gets None, crashes.
**Solution**: Always validate XCom pulls:
```python
def process_data(**context):
data_path = context['ti'].xcom_pull(
key='data_path',
task_ids='upstream_task'
)
if data_path is None:
raise ValueError("No data_path from upstream_task - check XCom push")
# Process data...
```
### 3. DAG Not Appearing in UI
**Problem**: DAG file exists in `dags/` but doesn't show in Airflow UI.
**Solution**: Check DAG parsing errors:
```bash
# Check for syntax errors
python dags/my_dag.py
# View DAG import errors in UI
# Navigate to: Browse → DAG Import Errors
# Common fixes:
# 1. Ensure DAG object is defined in file
# 2. Check for circular imports
# 3. Verify all dependencies installed
# 4. Fix syntax errors
```
### 4. Hardcoded Paths Break in Production
**Problem**: Paths like `/Users/myname/data/` work locally, fail in production.
**Solution**: Use Airflow Variables or environment variables:
```python
from airflow.models import Variable
def load_data(**context):
# ❌ Bad: Hardcoded path
# data_path = "/Users/myname/data/train.csv"
# ✅ Good: Use Airflow Variable
data_dir = Variable.get("data_directory", "/data")
data_path = f"{data_dir}/train.csv"
# Or use environment variable
import os
data_path = os.getenv("DATA_PATH", "/data/train.csv")
```
### 5. Stuck Tasks Consume Resources
**Problem**: Task hangs indefinitely, blocks worker slot, wastes resources.
**Solution**: Set execution_timeout on tasks:
```python
frRelated in Productivity
gitea-workflow
IncludedOrchestrate agile development workflows for Gitea repositories using the tea CLI. Use when working with Gitea-hosted repos and asking to 'run the workflow', 'continue working', 'what's next', 'complete the task cycle', 'start my day', 'end the sprint', 'implement the next task', or wanting guided step-by-step development assistance. Keywords: workflow, orchestrate, agile, task cycle, sprint, daily, implement, review, PR, standup, retrospective, gitea, tea.
microsoft-graph-gateway
IncludedRoute Microsoft Graph work in this workspace. Use when users want to read or write Outlook mail, calendar events, contacts, OneDrive or SharePoint files, Teams, Planner, To Do, users, groups, directory data, or arbitrary Microsoft Graph endpoints from VS Code. Prefer WorkIQ for common read scenarios. Use Microsoft Graph for write actions and gap-read scenarios that need exact Graph properties, filters, permissions, or endpoints.
copilotkit
IncludedUse when building with CopilotKit — setup, development, integrations, debugging, upgrading, or contributing. Routes to the appropriate specialized skill based on the task.
wordly-wisdom
IncludedProvides calibrated decision analysis using Charlie Munger-style multiple mental models, inversion, incentive mapping, circle-of-competence checks, misjudgment audits, second-order effects, and forecast updates. Use when the user asks for an oracle take, a hard call, a decision memo, a premortem, an outside view, a red-team, a sanity-check, what am I missing, think this through, or wants a strategy, hire, investment, plan, product, partnership, or major life choice analysed. Avoid for simple factual lookups or time-sensitive legal, medical, or market questions without fresh evidence.
swain-session
IncludedSession management and project status dashboard. Owns the full session lifecycle (start/work/close/resume), focus lane, bookmarks, worktree detection, and tab naming. Also serves as the project status dashboard — shows active epics, progress, actionable next steps, blocked items, tasks, GitHub issues, and recommendations. Worktree creation is deferred to swain-do task dispatch (SPEC-195). Triggers on: 'session', 'status', 'what's next', 'dashboard', 'overview', 'where are we', 'what should I work on', 'show me priorities', 'bookmark', 'focus on', 'session info'.
gandi
IncludedComprehensive Gandi domain registrar integration for domain and DNS management. Register and manage domains, create/update/delete DNS records (A, AAAA, CNAME, MX, TXT, SRV, and more), configure email forwarding and aliases, check SSL certificate status, create DNS snapshots for safe rollback, bulk update zone files, and monitor domain expiration. Supports multi-domain management, zone file import/export, and automated DNS backups. Includes both read-only and destructive operations with safety controls.