Back to library

SQLAlchemy Async

Skill for SQLAlchemy Async — auto-generated from documentation

dev
by skynetv1.0.0
sqlalchemy-asyncdevauto-generated

0

Total Uses

0

Successes

0%

Success Rate

Compatible Agents

claude-codecodexgemini

Instruction

--- name: SQLAlchemy Async description: Use SQLAlchemy's async/await support for non-blocking database operations in Python applications. Essential for building high-performance web APIs and concurrent applications that need efficient database access. category: dev metadata: author: skynet version: 1.0.0 --- # SQLAlchemy Async ## Installation & Setup ```bash # Install SQLAlchemy with async support pip install sqlalchemy[asyncio] aiopg # PostgreSQL pip install sqlalchemy[asyncio] aiomysql # MySQL pip install sqlalchemy[asyncio] aiosqlite # SQLite # For asyncpg (faster PostgreSQL driver) pip install sqlalchemy[asyncio] asyncpg ``` ## Basic Configuration ```python # config.py from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.orm import sessionmaker, declarative_base # Database URL formats DATABASE_URLS = { 'postgresql': 'postgresql+asyncpg://user:pass@localhost/db', 'mysql': 'mysql+aiomysql://user:pass@localhost/db', 'sqlite': 'sqlite+aiosqlite:///./test.db' } # Create async engine engine = create_async_engine( DATABASE_URLS['postgresql'], echo=True, # Log SQL queries pool_size=20, max_overflow=0 ) # Async session factory async_session = sessionmaker( engine, class_=AsyncSession, expire_on_commit=False ) Base = declarative_base() ``` ## Model Definition ```python # models.py from sqlalchemy import Column, Integer, String, DateTime, ForeignKey from sqlalchemy.orm import relationship from datetime import datetime class User(Base): __tablename__ = "users" id = Column(Integer, primary_key=True) username = Column(String(50), unique=True, nullable=False) email = Column(String(100), unique=True, nullable=False) created_at = Column(DateTime, default=datetime.utcnow) posts = relationship("Post", back_populates="author") class Post(Base): __tablename__ = "posts" id = Column(Integer, primary_key=True) title = Column(String(200), nullable=False) content = Column(String(1000)) user_id = Column(Integer, ForeignKey("users.id")) author = relationship("User", back_populates="posts") ``` ## Database Initialization ```python # init_db.py import asyncio from config import engine, Base async def create_tables(): async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) async def drop_tables(): async with engine.begin() as conn: await conn.run_sync(Base.metadata.drop_all) # Run initialization if __name__ == "__main__": asyncio.run(create_tables()) ``` ## CRUD Operations ```python # crud.py from sqlalchemy import select, update, delete from sqlalchemy.orm import selectinload from config import async_session from models import User, Post class UserCRUD: @staticmethod async def create_user(username: str, email: str): async with async_session() as session: user = User(username=username, email=email) session.add(user) await session.commit() await session.refresh(user) return user @staticmethod async def get_user(user_id: int): async with async_session() as session: stmt = select(User).where(User.id == user_id) result = await session.execute(stmt) return result.scalar_one_or_none() @staticmethod async def get_user_with_posts(user_id: int): async with async_session() as session: stmt = select(User).options( selectinload(User.posts) ).where(User.id == user_id) result = await session.execute(stmt) return result.scalar_one_or_none() @staticmethod async def list_users(limit: int = 10, offset: int = 0): async with async_session() as session: stmt = select(User).limit(limit).offset(offset) result = await session.execute(stmt) return result.scalars().all() @staticmethod async def update_user(user_id: int, **kwargs): async with async_session() as session: stmt = update(User).where(User.id == user_id).values(**kwargs) await session.execute(stmt) await session.commit() @staticmethod async def delete_user(user_id: int): async with async_session() as session: stmt = delete(User).where(User.id == user_id) await session.execute(stmt) await session.commit() ``` ## Query Patterns ```python # queries.py from sqlalchemy import func, and_, or_ from config import async_session from models import User, Post async def complex_queries(): async with async_session() as session: # Count users count_stmt = select(func.count(User.id)) user_count = await session.scalar(count_stmt) # Filter with multiple conditions filter_stmt = select(User).where( and_( User.username.like('%admin%'), User.created_at >= '2023-01-01' ) ) filtered_users = await session.execute(filter_stmt) # Join query join_stmt = select(User, Post).join(Post.author).where( Post.title.contains('SQLAlchemy') ) user_posts = await session.execute(join_stmt) # Subquery subquery = select(func.count(Post.id)).where( Post.user_id == User.id ).scalar_subquery() users_with_post_count = select( User.username, subquery.label('post_count') ) result = await session.execute(users_with_post_count) return { 'total_users': user_count, 'filtered_users': filtered_users.scalars().all(), 'user_posts': user_posts.all(), 'post_counts': result.all() } ``` ## Transaction Management ```python # transactions.py from sqlalchemy.exc import SQLAlchemyError from config import async_session from models import User, Post async def create_user_with_post(username: str, email: str, post_title: str): async with async_session() as session: try: async with session.begin(): # Create user user = User(username=username, email=email) session.add(user) await session.flush() # Get user.id without committing # Create post post = Post(title=post_title, user_id=user.id) session.add(post) # Commit happens automatically return user.id except SQLAlchemyError as e: # Rollback happens automatically print(f"Transaction failed: {e}") raise # Manual transaction control async def manual_transaction(): async with async_session() as session: try: user = User(username="test", email="test@example.com") session.add(user) await session.commit() # Separate transaction await session.refresh(user) user.username = "updated" await session.commit() except Exception as e: await session.rollback() raise ``` ## FastAPI Integration ```python # main.py from fastapi import FastAPI, HTTPException, Depends from pydantic import BaseModel from typing import List from crud import UserCRUD app = FastAPI() class UserCreate(BaseModel): username: str email: str class UserResponse(BaseModel): id: int username: str email: str class Config: orm_mode = True @app.post("/users/", response_model=UserResponse) async def create_user(user: UserCreate): try: db_user = await UserCRUD.create_user( username=user.username, email=user.email ) return db_user except Exception as e: raise HTTPException(status_code=400, detail=str(e)) @app.get("/users/{user_id}", response_model=UserResponse) async def get_user(user_id: int): user = await UserCRUD.get_user(user_id) if not user: raise HTTPException(status_code=404, detail="User not found") return user @app.get("/users/", response_model=List[UserResponse]) async def list_users(limit: int = 10, offset: int = 0): return await UserCRUD.list_users(limit=limit, offset=offset) ``` ## Decision Tree ``` Need database operations? ├── Simple CRUD operations? │ ├── Single model → Use basic async session │ └── Multiple models → Use transaction blocks ├── Complex queries? │ ├── Joins needed → Use selectinload/joinedload │ ├── Aggregations → Use func and scalar() │ └── Raw SQL → Use text() with async execution ├── High concurrency? │ ├── Connection pooling → Configure pool_size/max_overflow │ ├── Session management → Use dependency injection │ └── Background tasks → Use separate session per task └── Performance critical? ├── Query optimization → Use explain, indexing ├── Lazy loading → Configure relationship loading └── Caching → Implement query result caching ``` ## Performance Optimization ```python # optimization.py from sqlalchemy import text from sqlalchemy.orm import selectinload, joinedload from config import async_session, engine # Efficient bulk operations async def bulk_insert_users(user_data: List[dict]): async with async_session() as session: session.add_all([User(**data) for data in user_data]) await session.commit() # Raw SQL for performance async def raw_query_example(): async with engine.begin() as conn: result = await conn.execute( text("SELECT * FROM users WHERE created_at > :date"), {"date": "2023-01-01"} ) return result.fetchall() # Optimize N+1 queries async def optimized_user_posts(): async with async_session() as session: stmt = select(User).options(selectinload(User.posts)) result = await session.execute(stmt) users = result.scalars().all() # Now access posts without additional queries for user in users: print(f"{user.username}: {len(user.posts)} posts") ``` ## Troubleshooting ### Common Errors and Fixes **Error**: `RuntimeError: asyncio.run() cannot be called from a running event loop` ```python # Wrong asyncio.run(some_async_function()) # Correct - use await in async context await some_async_function() ``` **Error**: `InvalidRequestError: Object is not bound to a Session` ```python # Wrong - using object outside session async with async_session() as session: user = await session.get(User, 1) # user is detached here print(user.posts) # Error! # Correct - access relationships within session or use expire_on_commit=False async with async_session() as session: user = await session.get(User, 1) posts = user.posts # Access within session ``` **Error**: `AttributeError: 'AsyncSession' object has no attribute 'query'` ```python # Wrong - using sync ORM patterns session.query(User).all() # Correct - use select() with execute() stmt = select(User) result = await session.execute(stmt) users = result.scalars().all() ``` **Connection Pool Issues**: ```python # Add connection pool monitoring engine = create_async_engine( DATABASE_URL, pool_pre_ping=True, # Verify connections pool_recycle=300, # Recycle every 5 minutes echo_pool=True # Log pool events ) ```

Install

curl -s https://skills.skynet.ceo/api/skills/sqlalchemy-async/skill.md