APScheduler — SKILL.md
Raw skill file that agents receive when using this skill
---
name: "APScheduler"
description: "Skill for APScheduler — auto-generated from documentation"
version: "1.0.0"
author: "skynet"
category: "dev"
agents: ["claude-code", "codex", "gemini"]
tags: ["apscheduler", "dev", "auto-generated"]
---
# APScheduler
---
name: APScheduler
description: Advanced Python Scheduler for managing jobs with different schedulers, executors, and job stores. Use when you need to schedule tasks in Python applications - from simple cron-like jobs to complex distributed task scheduling.
metadata:
author: skynet
version: 1.0.0
category: dev
---
# APScheduler (Advanced Python Scheduler)
## Installation & Setup
```bash
# Basic installation
pip install apscheduler
# With database support
pip install apscheduler[sqlalchemy]
# With timezone support
pip install apscheduler[timezone]
# With all extras
pip install "apscheduler[sqlalchemy,timezone,redis,rethinkdb]"
```
## Core Components Decision Tree
```
Need to schedule jobs?
├── Simple in-memory jobs → BackgroundScheduler
├── Web framework integration
│ ├── Flask/Django → BackgroundScheduler
│ ├── Tornado → TornadoScheduler
│ ├── Twisted → TwistedScheduler
│ └── AsyncIO → AsyncIOScheduler
└── Distributed/persistent jobs → Use database job store
```
## Basic Scheduler Setup
```python
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers.blocking import BlockingScheduler
import atexit
# Background scheduler (non-blocking)
scheduler = BackgroundScheduler()
scheduler.start()
atexit.register(lambda: scheduler.shutdown())
# Blocking scheduler (for standalone scripts)
scheduler = BlockingScheduler()
```
## Job Scheduling Patterns
### Interval-based Jobs
```python
import time
from datetime import datetime
def my_job():
print(f"Job executed at {datetime.now()}")
# Every 10 seconds
scheduler.add_job(my_job, 'interval', seconds=10, id='job_1')
# Every 2 hours with start delay
scheduler.add_job(
my_job,
'interval',
hours=2,
start_date='2024-01-01 00:00:00',
id='delayed_job'
)
# Using decorator
@scheduler.scheduled_job('interval', minutes=30)
def regular_cleanup():
print("Running cleanup...")
```
### Cron-like Jobs
```python
# Daily at 3:30 AM
scheduler.add_job(
my_job,
'cron',
hour=3,
minute=30,
id='daily_backup'
)
# Every weekday at 9 AM
scheduler.add_job(
my_job,
'cron',
day_of_week='mon-fri',
hour=9,
minute=0
)
# Complex cron expression
scheduler.add_job(
my_job,
'cron',
day='last', # Last day of month
hour=23,
minute=59
)
# Using decorator with timezone
@scheduler.scheduled_job('cron', hour=0, timezone='UTC')
def midnight_utc_job():
print("Midnight UTC task")
```
### One-time Jobs
```python
from datetime import datetime, timedelta
# Run once after 5 minutes
run_time = datetime.now() + timedelta(minutes=5)
scheduler.add_job(my_job, 'date', run_date=run_time, id='one_time')
# Run at specific date
scheduler.add_job(
my_job,
'date',
run_date='2024-12-31 23:59:59',
id='new_year_job'
)
```
## Advanced Configuration
### Custom Job Stores
```python
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.jobstores.redis import RedisJobStore
from apscheduler.executors.pool import ThreadPoolExecutor
# Database job store
jobstores = {
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite'),
'redis': RedisJobStore(host='localhost', port=6379)
}
# Custom executors
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
# Job defaults
job_defaults = {
'coalesce': False,
'max_instances': 3,
'misfire_grace_time': 30
}
scheduler = BackgroundScheduler(
jobstores=jobstores,
executors=executors,
job_defaults=job_defaults
)
```
### Job Management
```python
# List all jobs
for job in scheduler.get_jobs():
print(f"Job ID: {job.id}, Next run: {job.next_run_time}")
# Modify existing job
scheduler.modify_job('job_1', seconds=30)
# Pause/resume jobs
scheduler.pause_job('job_1')
scheduler.resume_job('job_1')
# Remove jobs
scheduler.remove_job('job_1')
# Shutdown scheduler
scheduler.shutdown(wait=False) # Don't wait for jobs to complete
```
## Error Handling & Monitoring
```python
import logging
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('apscheduler')
def job_listener(event):
if event.exception:
print(f"Job {event.job_id} crashed: {event.exception}")
else:
print(f"Job {event.job_id} executed successfully")
# Add event listeners
scheduler.add_listener(job_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
# Job with error handling
def robust_job():
try:
# Your job logic here
risky_operation()
except Exception as e:
logger.error(f"Job failed: {e}")
# Handle error (retry, alert, etc.)
scheduler.add_job(robust_job, 'interval', minutes=5)
```
## Framework Integration
### Flask Integration
```python
from flask import Flask
from apscheduler.schedulers.background import BackgroundScheduler
import atexit
app = Flask(__name__)
scheduler = BackgroundScheduler(daemon=True)
@app.before_first_request
def initialize_scheduler():
scheduler.start()
@app.route('/schedule-job')
def schedule_job():
scheduler.add_job(
id='flask_job',
func=background_task,
trigger='interval',
seconds=30
)
return "Job scheduled"
def background_task():
with app.app_context():
# Access Flask app context
print("Background task with Flask context")
atexit.register(lambda: scheduler.shutdown())
```
### AsyncIO Integration
```python
import asyncio
from apscheduler.schedulers.asyncio import AsyncIOScheduler
async def async_job():
print("Async job started")
await asyncio.sleep(2)
print("Async job completed")
scheduler = AsyncIOScheduler()
scheduler.add_job(async_job, 'interval', seconds=10)
async def main():
scheduler.start()
try:
await asyncio.sleep(60) # Run for 60 seconds
finally:
scheduler.shutdown()
asyncio.run(main())
```
## Troubleshooting
### Common Errors & Solutions
**Error: `SchedulerAlreadyRunningError`**
```python
# Check if scheduler is running
if not scheduler.running:
scheduler.start()
```
**Error: `JobLookupError: No job by the id of 'job_id' was found`**
```python
# Check if job exists before modifying
if scheduler.get_job('job_id'):
scheduler.modify_job('job_id', seconds=60)
```
**Error: Jobs not executing**
```python
# Check job store and executor configuration
print("Job stores:", scheduler._jobstores)
print("Executors:", scheduler._executors)
# Verify job is scheduled
job = scheduler.get_job('job_id')
if job:
print(f"Next run time: {job.next_run_time}")
```
**Memory leaks with many jobs**
```python
# Use job coalescing and limits
scheduler.add_job(
my_job,
'interval',
seconds=10,
coalesce=True, # Combine missed executions
max_instances=1, # Only one instance at a time
misfire_grace_time=30 # Grace period for missed jobs
)
```
### Debugging Configuration
```python
import logging
# Enable debug logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s %(levelname)s %(name)s %(message)s'
)
# APScheduler specific logging
logging.getLogger('apscheduler').setLevel(logging.DEBUG)
# Print scheduler state
print(f"Scheduler state: {scheduler.state}")
print(f"Running: {scheduler.running}")
```
## Performance Optimization
```python
# Optimize for high-frequency jobs
scheduler = BackgroundScheduler(
executors={
'default': ThreadPoolExecutor(max_workers=50),
'processpool': ProcessPoolExecutor(max_workers=10)
},
job_defaults={
'coalesce': True,
'max_instances': 1,
'misfire_grace_time': 10
}
)
# Use appropriate job store for scale
# SQLite: Small applications
# PostgreSQL/MySQL: Medium applications
# Redis: High-performance/distributed applications
```
curl -s https://skills.skynet.ceo/api/skills/apscheduler/skill.md