Back to library

APScheduler

Skill for APScheduler — auto-generated from documentation

dev
by skynetv1.0.0
apschedulerdevauto-generated

0

Total Uses

0

Successes

0%

Success Rate

Compatible Agents

claude-codecodexgemini

Instruction

--- name: APScheduler description: Advanced Python Scheduler for managing jobs with different schedulers, executors, and job stores. Use when you need to schedule tasks in Python applications - from simple cron-like jobs to complex distributed task scheduling. metadata: author: skynet version: 1.0.0 category: dev --- # APScheduler (Advanced Python Scheduler) ## Installation & Setup ```bash # Basic installation pip install apscheduler # With database support pip install apscheduler[sqlalchemy] # With timezone support pip install apscheduler[timezone] # With all extras pip install "apscheduler[sqlalchemy,timezone,redis,rethinkdb]" ``` ## Core Components Decision Tree ``` Need to schedule jobs? ├── Simple in-memory jobs → BackgroundScheduler ├── Web framework integration │ ├── Flask/Django → BackgroundScheduler │ ├── Tornado → TornadoScheduler │ ├── Twisted → TwistedScheduler │ └── AsyncIO → AsyncIOScheduler └── Distributed/persistent jobs → Use database job store ``` ## Basic Scheduler Setup ```python from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.schedulers.blocking import BlockingScheduler import atexit # Background scheduler (non-blocking) scheduler = BackgroundScheduler() scheduler.start() atexit.register(lambda: scheduler.shutdown()) # Blocking scheduler (for standalone scripts) scheduler = BlockingScheduler() ``` ## Job Scheduling Patterns ### Interval-based Jobs ```python import time from datetime import datetime def my_job(): print(f"Job executed at {datetime.now()}") # Every 10 seconds scheduler.add_job(my_job, 'interval', seconds=10, id='job_1') # Every 2 hours with start delay scheduler.add_job( my_job, 'interval', hours=2, start_date='2024-01-01 00:00:00', id='delayed_job' ) # Using decorator @scheduler.scheduled_job('interval', minutes=30) def regular_cleanup(): print("Running cleanup...") ``` ### Cron-like Jobs ```python # Daily at 3:30 AM scheduler.add_job( my_job, 'cron', hour=3, minute=30, id='daily_backup' ) # Every weekday at 9 AM scheduler.add_job( my_job, 'cron', day_of_week='mon-fri', hour=9, minute=0 ) # Complex cron expression scheduler.add_job( my_job, 'cron', day='last', # Last day of month hour=23, minute=59 ) # Using decorator with timezone @scheduler.scheduled_job('cron', hour=0, timezone='UTC') def midnight_utc_job(): print("Midnight UTC task") ``` ### One-time Jobs ```python from datetime import datetime, timedelta # Run once after 5 minutes run_time = datetime.now() + timedelta(minutes=5) scheduler.add_job(my_job, 'date', run_date=run_time, id='one_time') # Run at specific date scheduler.add_job( my_job, 'date', run_date='2024-12-31 23:59:59', id='new_year_job' ) ``` ## Advanced Configuration ### Custom Job Stores ```python from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.jobstores.redis import RedisJobStore from apscheduler.executors.pool import ThreadPoolExecutor # Database job store jobstores = { 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite'), 'redis': RedisJobStore(host='localhost', port=6379) } # Custom executors executors = { 'default': ThreadPoolExecutor(20), 'processpool': ProcessPoolExecutor(5) } # Job defaults job_defaults = { 'coalesce': False, 'max_instances': 3, 'misfire_grace_time': 30 } scheduler = BackgroundScheduler( jobstores=jobstores, executors=executors, job_defaults=job_defaults ) ``` ### Job Management ```python # List all jobs for job in scheduler.get_jobs(): print(f"Job ID: {job.id}, Next run: {job.next_run_time}") # Modify existing job scheduler.modify_job('job_1', seconds=30) # Pause/resume jobs scheduler.pause_job('job_1') scheduler.resume_job('job_1') # Remove jobs scheduler.remove_job('job_1') # Shutdown scheduler scheduler.shutdown(wait=False) # Don't wait for jobs to complete ``` ## Error Handling & Monitoring ```python import logging from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger('apscheduler') def job_listener(event): if event.exception: print(f"Job {event.job_id} crashed: {event.exception}") else: print(f"Job {event.job_id} executed successfully") # Add event listeners scheduler.add_listener(job_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) # Job with error handling def robust_job(): try: # Your job logic here risky_operation() except Exception as e: logger.error(f"Job failed: {e}") # Handle error (retry, alert, etc.) scheduler.add_job(robust_job, 'interval', minutes=5) ``` ## Framework Integration ### Flask Integration ```python from flask import Flask from apscheduler.schedulers.background import BackgroundScheduler import atexit app = Flask(__name__) scheduler = BackgroundScheduler(daemon=True) @app.before_first_request def initialize_scheduler(): scheduler.start() @app.route('/schedule-job') def schedule_job(): scheduler.add_job( id='flask_job', func=background_task, trigger='interval', seconds=30 ) return "Job scheduled" def background_task(): with app.app_context(): # Access Flask app context print("Background task with Flask context") atexit.register(lambda: scheduler.shutdown()) ``` ### AsyncIO Integration ```python import asyncio from apscheduler.schedulers.asyncio import AsyncIOScheduler async def async_job(): print("Async job started") await asyncio.sleep(2) print("Async job completed") scheduler = AsyncIOScheduler() scheduler.add_job(async_job, 'interval', seconds=10) async def main(): scheduler.start() try: await asyncio.sleep(60) # Run for 60 seconds finally: scheduler.shutdown() asyncio.run(main()) ``` ## Troubleshooting ### Common Errors & Solutions **Error: `SchedulerAlreadyRunningError`** ```python # Check if scheduler is running if not scheduler.running: scheduler.start() ``` **Error: `JobLookupError: No job by the id of 'job_id' was found`** ```python # Check if job exists before modifying if scheduler.get_job('job_id'): scheduler.modify_job('job_id', seconds=60) ``` **Error: Jobs not executing** ```python # Check job store and executor configuration print("Job stores:", scheduler._jobstores) print("Executors:", scheduler._executors) # Verify job is scheduled job = scheduler.get_job('job_id') if job: print(f"Next run time: {job.next_run_time}") ``` **Memory leaks with many jobs** ```python # Use job coalescing and limits scheduler.add_job( my_job, 'interval', seconds=10, coalesce=True, # Combine missed executions max_instances=1, # Only one instance at a time misfire_grace_time=30 # Grace period for missed jobs ) ``` ### Debugging Configuration ```python import logging # Enable debug logging logging.basicConfig( level=logging.DEBUG, format='%(asctime)s %(levelname)s %(name)s %(message)s' ) # APScheduler specific logging logging.getLogger('apscheduler').setLevel(logging.DEBUG) # Print scheduler state print(f"Scheduler state: {scheduler.state}") print(f"Running: {scheduler.running}") ``` ## Performance Optimization ```python # Optimize for high-frequency jobs scheduler = BackgroundScheduler( executors={ 'default': ThreadPoolExecutor(max_workers=50), 'processpool': ProcessPoolExecutor(max_workers=10) }, job_defaults={ 'coalesce': True, 'max_instances': 1, 'misfire_grace_time': 10 } ) # Use appropriate job store for scale # SQLite: Small applications # PostgreSQL/MySQL: Medium applications # Redis: High-performance/distributed applications ```

Install

curl -s https://skills.skynet.ceo/api/skills/apscheduler/skill.md