routing-strategies
Task routing and queue management patterns for Celery including priority queues, topic exchanges, worker-specific routing, and advanced queue configurations. Use when configuring task routing, managing queues, setting up priority queues, implementing worker routing, configuring topic exchanges, or when user mentions task routing, queue management, Celery routing, worker assignments, or message broker routing.
What this skill does
# Celery Task Routing Strategies Skill
This skill provides comprehensive templates, scripts, and patterns for implementing advanced task routing and queue management in Celery applications, including priority queues, topic-based routing, and worker-specific queue assignments.
## Overview
Effective task routing is crucial for:
1. **Performance Optimization** - Route compute-intensive tasks to dedicated workers
2. **Priority Management** - High-priority tasks bypass slower queues
3. **Resource Isolation** - Separate critical operations from background jobs
4. **Scalability** - Independent scaling of different task types
This skill covers routing with RabbitMQ, Redis, and custom broker configurations.
## Available Scripts
### 1. Test Routing Configuration
**Script**: `scripts/test-routing.sh <config-file>`
**Purpose**: Validates routing configuration and tests queue connectivity
**Checks**:
- Broker connectivity (RabbitMQ/Redis)
- Queue declarations
- Exchange configurations
- Routing key patterns
- Worker queue bindings
- Priority queue setup
**Usage**:
```bash
# Test routing configuration
./scripts/test-routing.sh ./celery_config.py
# Test with custom broker URL
BROKER_URL=amqp://user:password@localhost:5672// ./scripts/test-routing.sh ./celery_config.py
# Verbose output
VERBOSE=1 ./scripts/test-routing.sh ./celery_config.py
```
**Exit Codes**:
- `0`: All routing tests passed
- `1`: Configuration errors detected
- `2`: Broker connection failed
### 2. Validate Queue Configuration
**Script**: `scripts/validate-queues.sh <project-dir>`
**Purpose**: Validates queue setup across application code
**Checks**:
- Task decorators use valid queues
- No hardcoded queue names (use config)
- All queues defined in routing configuration
- Priority settings are valid (0-255)
- Exchange types match routing patterns
- Worker configurations reference valid queues
**Usage**:
```bash
# Validate current project
./scripts/validate-queues.sh .
# Validate specific directory
./scripts/validate-queues.sh /path/to/celery-app
# Generate detailed report
REPORT=1 ./scripts/validate-queues.sh . > queue-validation-report.md
```
**Exit Codes**:
- `0`: Validation passed
- `1`: Validation failed (must fix issues)
## Available Templates
### 1. Basic Queue Configuration
**Template**: `templates/queue-config.py`
**Features**:
- Default queue setup
- Named queues for different task types
- Queue-to-exchange bindings
- Priority settings
- Worker routing configuration
**Usage**:
```python
from celery import Celery
from templates.queue_config import CELERY_ROUTES, CELERY_QUEUES
app = Celery('myapp')
app.conf.task_routes = CELERY_ROUTES
app.conf.task_queues = CELERY_QUEUES
```
**Configuration Example**:
```python
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('high_priority', Exchange('default'), routing_key='high'),
Queue('low_priority', Exchange('default'), routing_key='low'),
Queue('emails', Exchange('emails'), routing_key='email.*'),
Queue('reports', Exchange('reports'), routing_key='report.*'),
)
CELERY_ROUTES = {
'myapp.tasks.send_email': {'queue': 'emails', 'routing_key': 'email.send'},
'myapp.tasks.generate_report': {'queue': 'reports', 'routing_key': 'report.generate'},
'myapp.tasks.urgent_task': {'queue': 'high_priority', 'priority': 9},
}
```
### 2. Dynamic Routing Rules
**Template**: `templates/routing-rules.py`
**Features**:
- Pattern-based routing
- Conditional routing logic
- Dynamic queue selection
- Routing by task name patterns
- Routing by task arguments
**Key Functions**:
```python
def route_task(name, args, kwargs, options, task=None, **kw):
"""
Dynamic routing based on task name or arguments
"""
if name.startswith('urgent.'):
return {'queue': 'high_priority', 'priority': 9}
if 'priority' in kwargs and kwargs['priority'] == 'high':
return {'queue': 'high_priority'}
if name.startswith('email.'):
return {'queue': 'emails', 'exchange': 'emails'}
return {'queue': 'default'}
app.conf.task_routes = (route_task,)
```
### 3. Priority Queue Setup
**Template**: `templates/priority-queues.py`
**Features**:
- Multi-level priority queues (0-255)
- Priority inheritance
- Default priority configuration
- Queue priority enforcement
**Priority Levels**:
```python
# Priority queue configuration
CELERY_QUEUES = (
Queue('critical', Exchange('tasks'), routing_key='critical',
queue_arguments={'x-max-priority': 10}),
Queue('high', Exchange('tasks'), routing_key='high',
queue_arguments={'x-max-priority': 10}),
Queue('normal', Exchange('tasks'), routing_key='normal',
queue_arguments={'x-max-priority': 10}),
Queue('low', Exchange('tasks'), routing_key='low',
queue_arguments={'x-max-priority': 10}),
)
# Task priority mapping
PRIORITY_LEVELS = {
'critical': 10, # Highest priority
'high': 7,
'normal': 5,
'low': 2,
}
# Apply priority to task
@app.task(priority=PRIORITY_LEVELS['high'])
def urgent_processing():
pass
```
### 4. Topic Exchange Routing
**Template**: `templates/topic-exchange.py`
**Features**:
- Topic-based routing patterns
- Wildcard routing keys
- Multi-queue routing
- Pattern matching
**Topic Patterns**:
```python
from kombu import Exchange, Queue
# Topic exchange setup
task_exchange = Exchange('tasks', type='topic', durable=True)
CELERY_QUEUES = (
# Match specific patterns
Queue('user.notifications', exchange=task_exchange,
routing_key='user.notification.*'),
# Match all email types
Queue('emails', exchange=task_exchange,
routing_key='email.#'),
# Match processing tasks
Queue('processing', exchange=task_exchange,
routing_key='*.processing.*'),
# Match all reports
Queue('reports', exchange=task_exchange,
routing_key='report.*'),
)
# Routing configuration
CELERY_ROUTES = {
'myapp.tasks.send_welcome_email': {
'exchange': 'tasks',
'routing_key': 'email.welcome.send'
},
'myapp.tasks.notify_user': {
'exchange': 'tasks',
'routing_key': 'user.notification.send'
},
}
```
### 5. Worker-Specific Routing
**Template**: `templates/worker-routing.py`
**Features**:
- Dedicated worker pools
- Worker-specific queues
- CPU vs I/O task separation
- Geographic routing
- Resource-based routing
**Worker Configuration**:
```python
# Worker pool definitions
WORKER_POOLS = {
'cpu_intensive': {
'queues': ['ml_training', 'video_processing', 'data_analysis'],
'concurrency': 4,
'prefetch_multiplier': 1,
},
'io_intensive': {
'queues': ['api_calls', 'file_uploads', 'emails'],
'concurrency': 50,
'prefetch_multiplier': 10,
},
'general': {
'queues': ['default', 'background'],
'concurrency': 10,
'prefetch_multiplier': 4,
},
}
# Start workers
# celery -A myapp worker --queues=ml_training,video_processing -c 4 -n cpu_worker@%h
# celery -A myapp worker --queues=api_calls,file_uploads -c 50 -n io_worker@%h
```
## Available Examples
### 1. Priority Queue Setup Guide
**Example**: `examples/priority-queue-setup.md`
**Demonstrates**:
- Configuring RabbitMQ priority queues
- Setting task priorities
- Priority inheritance patterns
- Testing priority routing
- Monitoring priority queue performance
**Key Concepts**:
- Priority range: 0 (lowest) to 255 (highest)
- RabbitMQ `x-max-priority` argument
- Priority at task definition vs runtime
- Queue argument configuration
### 2. Topic-Based Routing Implementation
**Example**: `examples/topic-routing.md`
**Demonstrates**:
- Topic exchange setup
- Routing key patterns (* and # wildcards)
- Multi-queue routing
- Pattern matching strategies
- Consumer binding patterns
**Routing Key Patterns**:
- `*` - Matches exactly one word
- `#` - Matches zero or more words
- Example: `email.*.send` matches `email.welcome.send`, `Related in Productivity
gitea-workflow
IncludedOrchestrate agile development workflows for Gitea repositories using the tea CLI. Use when working with Gitea-hosted repos and asking to 'run the workflow', 'continue working', 'what's next', 'complete the task cycle', 'start my day', 'end the sprint', 'implement the next task', or wanting guided step-by-step development assistance. Keywords: workflow, orchestrate, agile, task cycle, sprint, daily, implement, review, PR, standup, retrospective, gitea, tea.
microsoft-graph-gateway
IncludedRoute Microsoft Graph work in this workspace. Use when users want to read or write Outlook mail, calendar events, contacts, OneDrive or SharePoint files, Teams, Planner, To Do, users, groups, directory data, or arbitrary Microsoft Graph endpoints from VS Code. Prefer WorkIQ for common read scenarios. Use Microsoft Graph for write actions and gap-read scenarios that need exact Graph properties, filters, permissions, or endpoints.
copilotkit
IncludedUse when building with CopilotKit — setup, development, integrations, debugging, upgrading, or contributing. Routes to the appropriate specialized skill based on the task.
wordly-wisdom
IncludedProvides calibrated decision analysis using Charlie Munger-style multiple mental models, inversion, incentive mapping, circle-of-competence checks, misjudgment audits, second-order effects, and forecast updates. Use when the user asks for an oracle take, a hard call, a decision memo, a premortem, an outside view, a red-team, a sanity-check, what am I missing, think this through, or wants a strategy, hire, investment, plan, product, partnership, or major life choice analysed. Avoid for simple factual lookups or time-sensitive legal, medical, or market questions without fresh evidence.
swain-session
IncludedSession management and project status dashboard. Owns the full session lifecycle (start/work/close/resume), focus lane, bookmarks, worktree detection, and tab naming. Also serves as the project status dashboard — shows active epics, progress, actionable next steps, blocked items, tasks, GitHub issues, and recommendations. Worktree creation is deferred to swain-do task dispatch (SPEC-195). Triggers on: 'session', 'status', 'what's next', 'dashboard', 'overview', 'where are we', 'what should I work on', 'show me priorities', 'bookmark', 'focus on', 'session info'.
gandi
IncludedComprehensive Gandi domain registrar integration for domain and DNS management. Register and manage domains, create/update/delete DNS records (A, AAAA, CNAME, MX, TXT, SRV, and more), configure email forwarding and aliases, check SSL certificate status, create DNS snapshots for safe rollback, bulk update zone files, and monitor domain expiration. Supports multi-domain management, zone file import/export, and automated DNS backups. Includes both read-only and destructive operations with safety controls.