rabbitmq
Build messaging systems with RabbitMQ — queues, exchanges, routing, dead-letter queues, delayed messages, priority queues, and RPC patterns. Use when tasks involve task queues, background job processing, inter-service communication, request-reply patterns, or reliable message delivery with acknowledgments.
What this skill does
# RabbitMQ
Build reliable messaging and task queue systems for microservice architectures and background processing.
## Setup
```yaml
# docker-compose.yml — RabbitMQ with management UI
services:
rabbitmq:
image: rabbitmq:3.13-management
ports:
- "5672:5672" # AMQP
- "15672:15672" # Management UI
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: secret
volumes:
- rabbitmq-data:/var/lib/rabbitmq
volumes:
rabbitmq-data:
```
Management UI at http://localhost:15672 (admin/secret).
## Core Concepts
RabbitMQ routes messages through **exchanges** to **queues**. Producers send to exchanges, consumers read from queues. The exchange type determines routing logic:
- **Direct** — routes by exact routing key match
- **Fanout** — broadcasts to all bound queues (pub/sub)
- **Topic** — routes by pattern matching on routing key (`order.*`, `#.error`)
- **Headers** — routes by message header values
## Producer and Consumer
### Node.js (amqplib)
```typescript
// connection.ts — Shared connection with auto-reconnect
import amqp, { Channel, Connection } from 'amqplib';
let connection: Connection;
let channel: Channel;
async function getChannel(): Promise<Channel> {
if (channel) return channel;
connection = await amqp.connect('amqp://admin:secret@localhost:5672');
channel = await connection.createChannel();
// Prefetch 10 — each consumer processes up to 10 unacked messages
// Prevents one slow consumer from starving others
await channel.prefetch(10);
connection.on('close', () => {
console.error('RabbitMQ connection closed, reconnecting...');
channel = null!;
setTimeout(getChannel, 5000);
});
return channel;
}
export { getChannel };
```
```typescript
// task-producer.ts — Send tasks to a work queue
import { getChannel } from './connection';
/** Send a background task to the processing queue.
*
* @param queue - Target queue name.
* @param task - Task payload.
* @param priority - 0 (normal) to 9 (highest). Requires priority queue.
*/
async function sendTask(queue: string, task: object, priority = 0) {
const ch = await getChannel();
// Assert queue exists with desired properties (idempotent)
await ch.assertQueue(queue, {
durable: true, // Survives broker restart
arguments: {
'x-max-priority': 10, // Enable priority queue
'x-dead-letter-exchange': 'dlx', // Failed messages go here
'x-message-ttl': 86400000, // 24h TTL
},
});
ch.sendToQueue(queue, Buffer.from(JSON.stringify(task)), {
persistent: true, // Write to disk (survives restart)
priority,
contentType: 'application/json',
timestamp: Date.now(),
messageId: crypto.randomUUID(),
});
}
// Example: send an email task
await sendTask('email-queue', {
to: '[email protected]',
template: 'welcome',
data: { name: 'Alex', activationLink: 'https://app.example.com/activate/abc123' },
});
// High-priority: password reset (skip ahead of marketing emails)
await sendTask('email-queue', {
to: '[email protected]',
template: 'password-reset',
data: { resetLink: 'https://app.example.com/reset/xyz789' },
}, 9);
```
```typescript
// task-consumer.ts — Process tasks with acknowledgment
import { getChannel } from './connection';
/** Start consuming tasks from a queue.
*
* @param queue - Queue to consume from.
* @param handler - Async function that processes each task.
*/
async function startWorker(queue: string, handler: (task: any) => Promise<void>) {
const ch = await getChannel();
await ch.assertQueue(queue, { durable: true });
ch.consume(queue, async (msg) => {
if (!msg) return;
const task = JSON.parse(msg.content.toString());
const msgId = msg.properties.messageId;
try {
await handler(task);
ch.ack(msg); // Success — remove from queue
} catch (err) {
console.error(`Task ${msgId} failed:`, err);
// Check if this has been retried too many times
const retryCount = (msg.properties.headers?.['x-retry-count'] || 0);
if (retryCount >= 3) {
ch.nack(msg, false, false); // Send to dead-letter queue after 3 retries
} else {
// Requeue with incremented retry count
ch.nack(msg, false, false);
ch.sendToQueue(queue, msg.content, {
...msg.properties,
headers: { ...msg.properties.headers, 'x-retry-count': retryCount + 1 },
});
}
}
});
console.log(`Worker started on queue: ${queue}`);
}
// Start processing emails
startWorker('email-queue', async (task) => {
await sendEmail(task.to, task.template, task.data);
});
```
For Python, use `pika` with `BlockingConnection`, `basic_qos(prefetch_count=N)`, and `basic_consume` with `basic_ack`/`basic_nack` for message handling.
## Exchange Patterns
### Fanout (broadcast)
Every bound queue gets every message — useful for notifications, cache invalidation, audit logging:
```typescript
// fanout.ts — Broadcast events to multiple consumers
const ch = await getChannel();
await ch.assertExchange('user-events', 'fanout', { durable: true });
// Each service creates its own queue bound to the exchange
// Email service queue
await ch.assertQueue('user-events-email', { durable: true });
await ch.bindQueue('user-events-email', 'user-events', '');
// Analytics service queue
await ch.assertQueue('user-events-analytics', { durable: true });
await ch.bindQueue('user-events-analytics', 'user-events', '');
// Publish — all bound queues receive the message
ch.publish('user-events', '', Buffer.from(JSON.stringify({
type: 'user.registered',
userId: 'usr-456',
email: '[email protected]',
})), { persistent: true });
```
### Topic (pattern routing)
Route messages to queues based on pattern matching:
```typescript
// topic.ts — Route by pattern (e.g., order.created goes to fulfillment, order.* goes to audit)
const ch = await getChannel();
await ch.assertExchange('events', 'topic', { durable: true });
// Fulfillment only cares about new orders
await ch.assertQueue('fulfillment', { durable: true });
await ch.bindQueue('fulfillment', 'events', 'order.created');
// Audit log captures everything
await ch.assertQueue('audit-log', { durable: true });
await ch.bindQueue('audit-log', 'events', '#'); // # matches all routing keys
// Payment service handles payment events
await ch.assertQueue('payment-processing', { durable: true });
await ch.bindQueue('payment-processing', 'events', 'payment.*');
// Publish with routing key
ch.publish('events', 'order.created', Buffer.from(JSON.stringify({ orderId: 'ord-123' })));
ch.publish('events', 'payment.received', Buffer.from(JSON.stringify({ amount: 89.97 })));
```
## Dead-Letter Queue
Failed messages need a place to go for inspection and replay:
```typescript
// dlq.ts — Dead-letter exchange and queue setup
const ch = await getChannel();
// Dead-letter exchange and queue
await ch.assertExchange('dlx', 'direct', { durable: true });
await ch.assertQueue('dead-letters', { durable: true });
await ch.bindQueue('dead-letters', 'dlx', '');
// Main queue — failed messages automatically route to DLX
await ch.assertQueue('orders', {
durable: true,
arguments: {
'x-dead-letter-exchange': 'dlx',
'x-dead-letter-routing-key': '',
},
});
// Monitor dead letters
ch.consume('dead-letters', (msg) => {
if (!msg) return;
const reason = msg.properties.headers?.['x-death']?.[0]?.reason;
console.error(`Dead letter (${reason}):`, msg.content.toString());
// Alert, log to monitoring, or attempt manual replay
ch.ack(msg);
});
```
## Delayed Messages
Use the `rabbitmq_delayed_message_exchange` plugin. Assert an exchange with type `x-delayed-message`, then publish messages with `headers: { 'x-delay': delayMs }` to schedule future delivery.
## RPC (Request-Reply)
For request-reply, create an exclusive auto-delete reply queue, send with `correlationId` and `replyTo`, and consume the reply queue filtering by `cRelated in Productivity
gitea-workflow
IncludedOrchestrate agile development workflows for Gitea repositories using the tea CLI. Use when working with Gitea-hosted repos and asking to 'run the workflow', 'continue working', 'what's next', 'complete the task cycle', 'start my day', 'end the sprint', 'implement the next task', or wanting guided step-by-step development assistance. Keywords: workflow, orchestrate, agile, task cycle, sprint, daily, implement, review, PR, standup, retrospective, gitea, tea.
microsoft-graph-gateway
IncludedRoute Microsoft Graph work in this workspace. Use when users want to read or write Outlook mail, calendar events, contacts, OneDrive or SharePoint files, Teams, Planner, To Do, users, groups, directory data, or arbitrary Microsoft Graph endpoints from VS Code. Prefer WorkIQ for common read scenarios. Use Microsoft Graph for write actions and gap-read scenarios that need exact Graph properties, filters, permissions, or endpoints.
copilotkit
IncludedUse when building with CopilotKit — setup, development, integrations, debugging, upgrading, or contributing. Routes to the appropriate specialized skill based on the task.
wordly-wisdom
IncludedProvides calibrated decision analysis using Charlie Munger-style multiple mental models, inversion, incentive mapping, circle-of-competence checks, misjudgment audits, second-order effects, and forecast updates. Use when the user asks for an oracle take, a hard call, a decision memo, a premortem, an outside view, a red-team, a sanity-check, what am I missing, think this through, or wants a strategy, hire, investment, plan, product, partnership, or major life choice analysed. Avoid for simple factual lookups or time-sensitive legal, medical, or market questions without fresh evidence.
swain-session
IncludedSession management and project status dashboard. Owns the full session lifecycle (start/work/close/resume), focus lane, bookmarks, worktree detection, and tab naming. Also serves as the project status dashboard — shows active epics, progress, actionable next steps, blocked items, tasks, GitHub issues, and recommendations. Worktree creation is deferred to swain-do task dispatch (SPEC-195). Triggers on: 'session', 'status', 'what's next', 'dashboard', 'overview', 'where are we', 'what should I work on', 'show me priorities', 'bookmark', 'focus on', 'session info'.
gandi
IncludedComprehensive Gandi domain registrar integration for domain and DNS management. Register and manage domains, create/update/delete DNS records (A, AAAA, CNAME, MX, TXT, SRV, and more), configure email forwarding and aliases, check SSL certificate status, create DNS snapshots for safe rollback, bulk update zone files, and monitor domain expiration. Supports multi-domain management, zone file import/export, and automated DNS backups. Includes both read-only and destructive operations with safety controls.