---
name: "APScheduler"
description: "Skill for APScheduler — auto-generated from documentation"
version: "1.0.0"
author: "skynet"
category: "dev"
agents: ["claude-code", "codex", "gemini"]
tags: ["apscheduler", "dev", "auto-generated"]
---

# APScheduler

---
name: APScheduler
description: Advanced Python Scheduler for managing jobs with different schedulers, executors, and job stores. Use when you need to schedule tasks in Python applications - from simple cron-like jobs to complex distributed task scheduling.
metadata:
  author: skynet
  version: 1.0.0
category: dev
---

# APScheduler (Advanced Python Scheduler)

## Installation & Setup

```bash
# Basic installation
pip install apscheduler

# With database support
pip install apscheduler[sqlalchemy]

# With timezone support
pip install apscheduler[timezone]

# With all extras
pip install "apscheduler[sqlalchemy,timezone,redis,rethinkdb]"
```

## Core Components Decision Tree

```
Need to schedule jobs?
├── Simple in-memory jobs → BackgroundScheduler
├── Web framework integration
│   ├── Flask/Django → BackgroundScheduler
│   ├── Tornado → TornadoScheduler
│   ├── Twisted → TwistedScheduler
│   └── AsyncIO → AsyncIOScheduler
└── Distributed/persistent jobs → Use database job store
```

## Basic Scheduler Setup

```python
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers.blocking import BlockingScheduler
import atexit

# Background scheduler (non-blocking)
scheduler = BackgroundScheduler()
scheduler.start()
atexit.register(lambda: scheduler.shutdown())

# Blocking scheduler (for standalone scripts)
scheduler = BlockingScheduler()
```

## Job Scheduling Patterns

### Interval-based Jobs
```python
import time
from datetime import datetime

def my_job():
    print(f"Job executed at {datetime.now()}")

# Every 10 seconds
scheduler.add_job(my_job, 'interval', seconds=10, id='job_1')

# Every 2 hours with start delay
scheduler.add_job(
    my_job, 
    'interval', 
    hours=2, 
    start_date='2024-01-01 00:00:00',
    id='delayed_job'
)

# Using decorator
@scheduler.scheduled_job('interval', minutes=30)
def regular_cleanup():
    print("Running cleanup...")
```

### Cron-like Jobs
```python
# Daily at 3:30 AM
scheduler.add_job(
    my_job,
    'cron',
    hour=3,
    minute=30,
    id='daily_backup'
)

# Every weekday at 9 AM
scheduler.add_job(
    my_job,
    'cron',
    day_of_week='mon-fri',
    hour=9,
    minute=0
)

# Complex cron expression
scheduler.add_job(
    my_job,
    'cron',
    day='last',  # Last day of month
    hour=23,
    minute=59
)

# Using decorator with timezone
@scheduler.scheduled_job('cron', hour=0, timezone='UTC')
def midnight_utc_job():
    print("Midnight UTC task")
```

### One-time Jobs
```python
from datetime import datetime, timedelta

# Run once after 5 minutes
run_time = datetime.now() + timedelta(minutes=5)
scheduler.add_job(my_job, 'date', run_date=run_time, id='one_time')

# Run at specific date
scheduler.add_job(
    my_job,
    'date',
    run_date='2024-12-31 23:59:59',
    id='new_year_job'
)
```

## Advanced Configuration

### Custom Job Stores
```python
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.jobstores.redis import RedisJobStore
from apscheduler.executors.pool import ThreadPoolExecutor

# Database job store
jobstores = {
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite'),
    'redis': RedisJobStore(host='localhost', port=6379)
}

# Custom executors
executors = {
    'default': ThreadPoolExecutor(20),
    'processpool': ProcessPoolExecutor(5)
}

# Job defaults
job_defaults = {
    'coalesce': False,
    'max_instances': 3,
    'misfire_grace_time': 30
}

scheduler = BackgroundScheduler(
    jobstores=jobstores,
    executors=executors,
    job_defaults=job_defaults
)
```

### Job Management
```python
# List all jobs
for job in scheduler.get_jobs():
    print(f"Job ID: {job.id}, Next run: {job.next_run_time}")

# Modify existing job
scheduler.modify_job('job_1', seconds=30)

# Pause/resume jobs
scheduler.pause_job('job_1')
scheduler.resume_job('job_1')

# Remove jobs
scheduler.remove_job('job_1')

# Shutdown scheduler
scheduler.shutdown(wait=False)  # Don't wait for jobs to complete
```

## Error Handling & Monitoring

```python
import logging
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('apscheduler')

def job_listener(event):
    if event.exception:
        print(f"Job {event.job_id} crashed: {event.exception}")
    else:
        print(f"Job {event.job_id} executed successfully")

# Add event listeners
scheduler.add_listener(job_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

# Job with error handling
def robust_job():
    try:
        # Your job logic here
        risky_operation()
    except Exception as e:
        logger.error(f"Job failed: {e}")
        # Handle error (retry, alert, etc.)

scheduler.add_job(robust_job, 'interval', minutes=5)
```

## Framework Integration

### Flask Integration
```python
from flask import Flask
from apscheduler.schedulers.background import BackgroundScheduler
import atexit

app = Flask(__name__)
scheduler = BackgroundScheduler(daemon=True)

@app.before_first_request
def initialize_scheduler():
    scheduler.start()

@app.route('/schedule-job')
def schedule_job():
    scheduler.add_job(
        id='flask_job',
        func=background_task,
        trigger='interval',
        seconds=30
    )
    return "Job scheduled"

def background_task():
    with app.app_context():
        # Access Flask app context
        print("Background task with Flask context")

atexit.register(lambda: scheduler.shutdown())
```

### AsyncIO Integration
```python
import asyncio
from apscheduler.schedulers.asyncio import AsyncIOScheduler

async def async_job():
    print("Async job started")
    await asyncio.sleep(2)
    print("Async job completed")

scheduler = AsyncIOScheduler()
scheduler.add_job(async_job, 'interval', seconds=10)

async def main():
    scheduler.start()
    try:
        await asyncio.sleep(60)  # Run for 60 seconds
    finally:
        scheduler.shutdown()

asyncio.run(main())
```

## Troubleshooting

### Common Errors & Solutions

**Error: `SchedulerAlreadyRunningError`**
```python
# Check if scheduler is running
if not scheduler.running:
    scheduler.start()
```

**Error: `JobLookupError: No job by the id of 'job_id' was found`**
```python
# Check if job exists before modifying
if scheduler.get_job('job_id'):
    scheduler.modify_job('job_id', seconds=60)
```

**Error: Jobs not executing**
```python
# Check job store and executor configuration
print("Job stores:", scheduler._jobstores)
print("Executors:", scheduler._executors)

# Verify job is scheduled
job = scheduler.get_job('job_id')
if job:
    print(f"Next run time: {job.next_run_time}")
```

**Memory leaks with many jobs**
```python
# Use job coalescing and limits
scheduler.add_job(
    my_job,
    'interval',
    seconds=10,
    coalesce=True,        # Combine missed executions
    max_instances=1,      # Only one instance at a time
    misfire_grace_time=30 # Grace period for missed jobs
)
```

### Debugging Configuration

```python
import logging

# Enable debug logging
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s %(levelname)s %(name)s %(message)s'
)

# APScheduler specific logging
logging.getLogger('apscheduler').setLevel(logging.DEBUG)

# Print scheduler state
print(f"Scheduler state: {scheduler.state}")
print(f"Running: {scheduler.running}")
```

## Performance Optimization

```python
# Optimize for high-frequency jobs
scheduler = BackgroundScheduler(
    executors={
        'default': ThreadPoolExecutor(max_workers=50),
        'processpool': ProcessPoolExecutor(max_workers=10)
    },
    job_defaults={
        'coalesce': True,
        'max_instances': 1,
        'misfire_grace_time': 10
    }
)

# Use appropriate job store for scale
# SQLite: Small applications
# PostgreSQL/MySQL: Medium applications  
# Redis: High-performance/distributed applications
```
