apscheduler
Advanced Python Scheduler - Task scheduling and job queue system
What this skill does
# APScheduler
APScheduler is a flexible task scheduling and job queue system for Python applications. It supports both synchronous and asynchronous execution with multiple scheduling mechanisms including cron-style, interval-based, and one-off scheduling.
## Quick Start
### Basic Synchronous Scheduler
```python
from datetime import datetime
from apscheduler import Scheduler
from apscheduler.triggers.interval import IntervalTrigger
def tick():
print(f"Tick: {datetime.now()}")
# Create and start scheduler with memory datastore
with Scheduler() as scheduler:
scheduler.add_schedule(tick, IntervalTrigger(seconds=1))
scheduler.run_until_stopped()
```
### Async Scheduler with FastAPI
```python
from contextlib import asynccontextmanager
from fastapi import FastAPI
from apscheduler import AsyncScheduler
from apscheduler.triggers.interval import IntervalTrigger
def cleanup_task():
print("Running cleanup task...")
@asynccontextmanager
async def lifespan(app: FastAPI):
scheduler = AsyncScheduler()
async with scheduler:
await scheduler.add_schedule(
cleanup_task,
IntervalTrigger(hours=1),
id="cleanup"
)
await scheduler.start_in_background()
yield
app = FastAPI(lifespan=lifespan)
```
## Common Patterns
### Schedulers
**In-memory scheduler (development):**
```python
from apscheduler import AsyncScheduler
async def main():
async with AsyncScheduler() as scheduler:
# Jobs lost on restart
await scheduler.add_schedule(my_task, trigger)
await scheduler.run_until_stopped()
```
**Persistent scheduler (production):**
```python
from sqlalchemy.ext.asyncio import create_async_engine
from apscheduler import AsyncScheduler
from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
async def main():
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
data_store = SQLAlchemyDataStore(engine)
async with AsyncScheduler(data_store) as scheduler:
# Jobs survive restarts
await scheduler.add_schedule(my_task, trigger)
await scheduler.run_until_stopped()
```
**Distributed scheduler:**
```python
from apscheduler import AsyncScheduler, SchedulerRole
from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker
# Scheduler node - creates jobs from schedules
async def scheduler_node():
async with AsyncScheduler(
data_store,
event_broker,
role=SchedulerRole.scheduler
) as scheduler:
await scheduler.add_schedule(task, trigger)
await scheduler.run_until_stopped()
# Worker node - executes jobs only
async def worker_node():
async with AsyncScheduler(
data_store,
event_broker,
role=SchedulerRole.worker
) as scheduler:
await scheduler.run_until_stopped()
```
### Jobs
**Simple function jobs:**
```python
def send_daily_report():
generate_report()
email_report("[email protected]")
scheduler.add_schedule(
send_daily_report,
CronTrigger(hour=9, minute=0) # 9 AM daily
)
```
**Jobs with arguments:**
```python
def process_data(source: str, destination: str, batch_size: int):
# Data processing logic
pass
scheduler.add_schedule(
process_data,
IntervalTrigger(hours=1),
kwargs={
'source': 's3://incoming',
'destination': 's3://processed',
'batch_size': 1000
}
)
```
**Async jobs:**
```python
async def fetch_external_api():
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com/data') as resp:
data = await resp.json()
await save_to_database(data)
scheduler.add_schedule(
fetch_external_api,
IntervalTrigger(minutes=5)
)
```
### Triggers
**Interval trigger:**
```python
from apscheduler.triggers.interval import IntervalTrigger
# Every 30 seconds
IntervalTrigger(seconds=30)
# Every 2 hours and 15 minutes
IntervalTrigger(hours=2, minutes=15)
# Every 3 days
IntervalTrigger(days=3)
```
**Cron trigger:**
```python
from apscheduler.triggers.cron import CronTrigger
# 9:00 AM Monday-Friday
CronTrigger(hour=9, minute=0, day_of_week='mon-fri')
# Every 15 minutes
CronTrigger(minute='*/15')
# Last day of month at midnight
CronTrigger(day='last', hour=0, minute=0)
# Using crontab syntax
CronTrigger.from_crontab('0 9 * * 1-5') # 9 AM weekdays
```
**Date trigger (one-time):**
```python
from datetime import datetime, timedelta
from apscheduler.triggers.date import DateTrigger
# 5 minutes from now
run_time = datetime.now() + timedelta(minutes=5)
DateTrigger(run_time=run_time)
# Specific datetime
DateTrigger(run_time=datetime(2024, 12, 31, 23, 59, 59))
```
**Calendar interval:**
```python
from apscheduler.triggers.calendarinterval import CalendarIntervalTrigger
# First day of every month at 9 AM
CalendarIntervalTrigger(months=1, hour=9, minute=0)
# Every Monday at 10 AM
CalendarIntervalTrigger(weeks=1, day_of_week='mon', hour=10, minute=0)
```
### Persistence
**SQLite:**
```python
engine = create_async_engine("sqlite+aiosqlite:///scheduler.db")
data_store = SQLAlchemyDataStore(engine)
```
**PostgreSQL:**
```python
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
data_store = SQLAlchemyDataStore(engine)
event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine)
```
**Redis (event broker):**
```python
from apscheduler.eventbrokers.redis import RedisEventBroker
event_broker = RedisEventBroker.from_url("redis://localhost:6379")
```
### Job Management
**Get job results:**
```python
async def main():
async with AsyncScheduler() as scheduler:
await scheduler.start_in_background()
# Add job with result retention
job_id = await scheduler.add_job(
calculate_result,
args=(10, 20),
result_expiration_time=timedelta(hours=1)
)
# Wait for result
result = await scheduler.get_job_result(job_id, wait=True)
print(f"Result: {result.return_value}")
```
**Schedule management:**
```python
# Pause schedule
await scheduler.pause_schedule("my_schedule")
# Resume schedule
await scheduler.unpause_schedule("my_schedule")
# Remove schedule
await scheduler.remove_schedule("my_schedule")
# Get schedule info
schedule = await scheduler.get_schedule("my_schedule")
print(f"Next run: {schedule.next_fire_time}")
```
**Event handling:**
```python
from apscheduler import JobAdded, JobReleased
def on_job_completed(event: JobReleased):
if event.outcome == Outcome.success:
print(f"Job {event.job_id} completed successfully")
else:
print(f"Job {event.job_id} failed: {event.exception}")
scheduler.subscribe(on_job_completed, JobReleased)
```
## Configuration
**Task defaults:**
```python
from apscheduler import TaskDefaults
task_defaults = TaskDefaults(
job_executor='threadpool',
max_running_jobs=3,
misfire_grace_time=timedelta(minutes=5)
)
scheduler = AsyncScheduler(task_defaults=task_defaults)
```
**Job execution options:**
```python
# Configure task behavior
await scheduler.configure_task(
my_function,
job_executor='processpool',
max_running_jobs=5,
misfire_grace_time=timedelta(minutes=10)
)
# Override per schedule
await scheduler.add_schedule(
my_function,
trigger,
job_executor='threadpool', # Override default
coalesce=CoalescePolicy.latest
)
```
## Requirements
```bash
# Core package
uv add apscheduler
# Database backends
uv add "apscheduler[postgresql]" # PostgreSQL
uv add "apscheduler[mongodb]" # MongoDB
uv add "apscheduler[sqlite]" # SQLite
# Event brokers
uv add "apscheduler[redis]" # Redis
uv add "apscheduler[mqtt]" # MQTT
```
**Dependencies by use case:**
- **Basic scheduling**: `apscheduler`
- **PostgreSQL persistence**: `asyncpg`, `sqlalchemy`
- **Redis distributed**: `redis`Related in Productivity
gitea-workflow
IncludedOrchestrate agile development workflows for Gitea repositories using the tea CLI. Use when working with Gitea-hosted repos and asking to 'run the workflow', 'continue working', 'what's next', 'complete the task cycle', 'start my day', 'end the sprint', 'implement the next task', or wanting guided step-by-step development assistance. Keywords: workflow, orchestrate, agile, task cycle, sprint, daily, implement, review, PR, standup, retrospective, gitea, tea.
microsoft-graph-gateway
IncludedRoute Microsoft Graph work in this workspace. Use when users want to read or write Outlook mail, calendar events, contacts, OneDrive or SharePoint files, Teams, Planner, To Do, users, groups, directory data, or arbitrary Microsoft Graph endpoints from VS Code. Prefer WorkIQ for common read scenarios. Use Microsoft Graph for write actions and gap-read scenarios that need exact Graph properties, filters, permissions, or endpoints.
copilotkit
IncludedUse when building with CopilotKit — setup, development, integrations, debugging, upgrading, or contributing. Routes to the appropriate specialized skill based on the task.
wordly-wisdom
IncludedProvides calibrated decision analysis using Charlie Munger-style multiple mental models, inversion, incentive mapping, circle-of-competence checks, misjudgment audits, second-order effects, and forecast updates. Use when the user asks for an oracle take, a hard call, a decision memo, a premortem, an outside view, a red-team, a sanity-check, what am I missing, think this through, or wants a strategy, hire, investment, plan, product, partnership, or major life choice analysed. Avoid for simple factual lookups or time-sensitive legal, medical, or market questions without fresh evidence.
swain-session
IncludedSession management and project status dashboard. Owns the full session lifecycle (start/work/close/resume), focus lane, bookmarks, worktree detection, and tab naming. Also serves as the project status dashboard — shows active epics, progress, actionable next steps, blocked items, tasks, GitHub issues, and recommendations. Worktree creation is deferred to swain-do task dispatch (SPEC-195). Triggers on: 'session', 'status', 'what's next', 'dashboard', 'overview', 'where are we', 'what should I work on', 'show me priorities', 'bookmark', 'focus on', 'session info'.
gandi
IncludedComprehensive Gandi domain registrar integration for domain and DNS management. Register and manage domains, create/update/delete DNS records (A, AAAA, CNAME, MX, TXT, SRV, and more), configure email forwarding and aliases, check SSL certificate status, create DNS snapshots for safe rollback, bulk update zone files, and monitor domain expiration. Supports multi-domain management, zone file import/export, and automated DNS backups. Includes both read-only and destructive operations with safety controls.