---
name: "SQLAlchemy Async"
description: "Skill for SQLAlchemy Async — auto-generated from documentation"
version: "1.0.0"
author: "skynet"
category: "dev"
agents: ["claude-code", "codex", "gemini"]
tags: ["sqlalchemy-async", "dev", "auto-generated"]
---

# SQLAlchemy Async

---
name: SQLAlchemy Async
description: Use SQLAlchemy's async/await support for non-blocking database operations in Python applications. Essential for building high-performance web APIs and concurrent applications that need efficient database access.
category: dev
metadata:
  author: skynet
  version: 1.0.0
---

# SQLAlchemy Async

## Installation & Setup

```bash
# Install SQLAlchemy with async support
pip install sqlalchemy[asyncio] aiopg  # PostgreSQL
pip install sqlalchemy[asyncio] aiomysql  # MySQL
pip install sqlalchemy[asyncio] aiosqlite  # SQLite

# For asyncpg (faster PostgreSQL driver)
pip install sqlalchemy[asyncio] asyncpg
```

## Basic Configuration

```python
# config.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base

# Database URL formats
DATABASE_URLS = {
    'postgresql': 'postgresql+asyncpg://user:pass@localhost/db',
    'mysql': 'mysql+aiomysql://user:pass@localhost/db',
    'sqlite': 'sqlite+aiosqlite:///./test.db'
}

# Create async engine
engine = create_async_engine(
    DATABASE_URLS['postgresql'],
    echo=True,  # Log SQL queries
    pool_size=20,
    max_overflow=0
)

# Async session factory
async_session = sessionmaker(
    engine, class_=AsyncSession, expire_on_commit=False
)

Base = declarative_base()
```

## Model Definition

```python
# models.py
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey
from sqlalchemy.orm import relationship
from datetime import datetime

class User(Base):
    __tablename__ = "users"
    
    id = Column(Integer, primary_key=True)
    username = Column(String(50), unique=True, nullable=False)
    email = Column(String(100), unique=True, nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow)
    
    posts = relationship("Post", back_populates="author")

class Post(Base):
    __tablename__ = "posts"
    
    id = Column(Integer, primary_key=True)
    title = Column(String(200), nullable=False)
    content = Column(String(1000))
    user_id = Column(Integer, ForeignKey("users.id"))
    
    author = relationship("User", back_populates="posts")
```

## Database Initialization

```python
# init_db.py
import asyncio
from config import engine, Base

async def create_tables():
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

async def drop_tables():
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)

# Run initialization
if __name__ == "__main__":
    asyncio.run(create_tables())
```

## CRUD Operations

```python
# crud.py
from sqlalchemy import select, update, delete
from sqlalchemy.orm import selectinload
from config import async_session
from models import User, Post

class UserCRUD:
    @staticmethod
    async def create_user(username: str, email: str):
        async with async_session() as session:
            user = User(username=username, email=email)
            session.add(user)
            await session.commit()
            await session.refresh(user)
            return user
    
    @staticmethod
    async def get_user(user_id: int):
        async with async_session() as session:
            stmt = select(User).where(User.id == user_id)
            result = await session.execute(stmt)
            return result.scalar_one_or_none()
    
    @staticmethod
    async def get_user_with_posts(user_id: int):
        async with async_session() as session:
            stmt = select(User).options(
                selectinload(User.posts)
            ).where(User.id == user_id)
            result = await session.execute(stmt)
            return result.scalar_one_or_none()
    
    @staticmethod
    async def list_users(limit: int = 10, offset: int = 0):
        async with async_session() as session:
            stmt = select(User).limit(limit).offset(offset)
            result = await session.execute(stmt)
            return result.scalars().all()
    
    @staticmethod
    async def update_user(user_id: int, **kwargs):
        async with async_session() as session:
            stmt = update(User).where(User.id == user_id).values(**kwargs)
            await session.execute(stmt)
            await session.commit()
    
    @staticmethod
    async def delete_user(user_id: int):
        async with async_session() as session:
            stmt = delete(User).where(User.id == user_id)
            await session.execute(stmt)
            await session.commit()
```

## Query Patterns

```python
# queries.py
from sqlalchemy import func, and_, or_
from config import async_session
from models import User, Post

async def complex_queries():
    async with async_session() as session:
        # Count users
        count_stmt = select(func.count(User.id))
        user_count = await session.scalar(count_stmt)
        
        # Filter with multiple conditions
        filter_stmt = select(User).where(
            and_(
                User.username.like('%admin%'),
                User.created_at >= '2023-01-01'
            )
        )
        filtered_users = await session.execute(filter_stmt)
        
        # Join query
        join_stmt = select(User, Post).join(Post.author).where(
            Post.title.contains('SQLAlchemy')
        )
        user_posts = await session.execute(join_stmt)
        
        # Subquery
        subquery = select(func.count(Post.id)).where(
            Post.user_id == User.id
        ).scalar_subquery()
        
        users_with_post_count = select(
            User.username, 
            subquery.label('post_count')
        )
        result = await session.execute(users_with_post_count)
        
        return {
            'total_users': user_count,
            'filtered_users': filtered_users.scalars().all(),
            'user_posts': user_posts.all(),
            'post_counts': result.all()
        }
```

## Transaction Management

```python
# transactions.py
from sqlalchemy.exc import SQLAlchemyError
from config import async_session
from models import User, Post

async def create_user_with_post(username: str, email: str, post_title: str):
    async with async_session() as session:
        try:
            async with session.begin():
                # Create user
                user = User(username=username, email=email)
                session.add(user)
                await session.flush()  # Get user.id without committing
                
                # Create post
                post = Post(title=post_title, user_id=user.id)
                session.add(post)
                
                # Commit happens automatically
                return user.id
                
        except SQLAlchemyError as e:
            # Rollback happens automatically
            print(f"Transaction failed: {e}")
            raise

# Manual transaction control
async def manual_transaction():
    async with async_session() as session:
        try:
            user = User(username="test", email="test@example.com")
            session.add(user)
            await session.commit()
            
            # Separate transaction
            await session.refresh(user)
            user.username = "updated"
            await session.commit()
            
        except Exception as e:
            await session.rollback()
            raise
```

## FastAPI Integration

```python
# main.py
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from typing import List
from crud import UserCRUD

app = FastAPI()

class UserCreate(BaseModel):
    username: str
    email: str

class UserResponse(BaseModel):
    id: int
    username: str
    email: str
    
    class Config:
        orm_mode = True

@app.post("/users/", response_model=UserResponse)
async def create_user(user: UserCreate):
    try:
        db_user = await UserCRUD.create_user(
            username=user.username, 
            email=user.email
        )
        return db_user
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int):
    user = await UserCRUD.get_user(user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

@app.get("/users/", response_model=List[UserResponse])
async def list_users(limit: int = 10, offset: int = 0):
    return await UserCRUD.list_users(limit=limit, offset=offset)
```

## Decision Tree

```
Need database operations?
├── Simple CRUD operations?
│   ├── Single model → Use basic async session
│   └── Multiple models → Use transaction blocks
├── Complex queries?
│   ├── Joins needed → Use selectinload/joinedload
│   ├── Aggregations → Use func and scalar()
│   └── Raw SQL → Use text() with async execution
├── High concurrency?
│   ├── Connection pooling → Configure pool_size/max_overflow
│   ├── Session management → Use dependency injection
│   └── Background tasks → Use separate session per task
└── Performance critical?
    ├── Query optimization → Use explain, indexing
    ├── Lazy loading → Configure relationship loading
    └── Caching → Implement query result caching
```

## Performance Optimization

```python
# optimization.py
from sqlalchemy import text
from sqlalchemy.orm import selectinload, joinedload
from config import async_session, engine

# Efficient bulk operations
async def bulk_insert_users(user_data: List[dict]):
    async with async_session() as session:
        session.add_all([User(**data) for data in user_data])
        await session.commit()

# Raw SQL for performance
async def raw_query_example():
    async with engine.begin() as conn:
        result = await conn.execute(
            text("SELECT * FROM users WHERE created_at > :date"),
            {"date": "2023-01-01"}
        )
        return result.fetchall()

# Optimize N+1 queries
async def optimized_user_posts():
    async with async_session() as session:
        stmt = select(User).options(selectinload(User.posts))
        result = await session.execute(stmt)
        users = result.scalars().all()
        
        # Now access posts without additional queries
        for user in users:
            print(f"{user.username}: {len(user.posts)} posts")
```

## Troubleshooting

### Common Errors and Fixes

**Error**: `RuntimeError: asyncio.run() cannot be called from a running event loop`
```python
# Wrong
asyncio.run(some_async_function())

# Correct - use await in async context
await some_async_function()
```

**Error**: `InvalidRequestError: Object is not bound to a Session`
```python
# Wrong - using object outside session
async with async_session() as session:
    user = await session.get(User, 1)
# user is detached here

print(user.posts)  # Error!

# Correct - access relationships within session or use expire_on_commit=False
async with async_session() as session:
    user = await session.get(User, 1)
    posts = user.posts  # Access within session
```

**Error**: `AttributeError: 'AsyncSession' object has no attribute 'query'`
```python
# Wrong - using sync ORM patterns
session.query(User).all()

# Correct - use select() with execute()
stmt = select(User)
result = await session.execute(stmt)
users = result.scalars().all()
```

**Connection Pool Issues**:
```python
# Add connection pool monitoring
engine = create_async_engine(
    DATABASE_URL,
    pool_pre_ping=True,  # Verify connections
    pool_recycle=300,    # Recycle every 5 minutes
    echo_pool=True       # Log pool events
)
```
