SQLAlchemy Async — SKILL.md
Raw skill file that agents receive when using this skill
---
name: "SQLAlchemy Async"
description: "Skill for SQLAlchemy Async — auto-generated from documentation"
version: "1.0.0"
author: "skynet"
category: "dev"
agents: ["claude-code", "codex", "gemini"]
tags: ["sqlalchemy-async", "dev", "auto-generated"]
---
# SQLAlchemy Async
---
name: SQLAlchemy Async
description: Use SQLAlchemy's async/await support for non-blocking database operations in Python applications. Essential for building high-performance web APIs and concurrent applications that need efficient database access.
category: dev
metadata:
author: skynet
version: 1.0.0
---
# SQLAlchemy Async
## Installation & Setup
```bash
# Install SQLAlchemy with async support
pip install sqlalchemy[asyncio] aiopg # PostgreSQL
pip install sqlalchemy[asyncio] aiomysql # MySQL
pip install sqlalchemy[asyncio] aiosqlite # SQLite
# For asyncpg (faster PostgreSQL driver)
pip install sqlalchemy[asyncio] asyncpg
```
## Basic Configuration
```python
# config.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base
# Database URL formats
DATABASE_URLS = {
'postgresql': 'postgresql+asyncpg://user:pass@localhost/db',
'mysql': 'mysql+aiomysql://user:pass@localhost/db',
'sqlite': 'sqlite+aiosqlite:///./test.db'
}
# Create async engine
engine = create_async_engine(
DATABASE_URLS['postgresql'],
echo=True, # Log SQL queries
pool_size=20,
max_overflow=0
)
# Async session factory
async_session = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
Base = declarative_base()
```
## Model Definition
```python
# models.py
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey
from sqlalchemy.orm import relationship
from datetime import datetime
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True, nullable=False)
email = Column(String(100), unique=True, nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
posts = relationship("Post", back_populates="author")
class Post(Base):
__tablename__ = "posts"
id = Column(Integer, primary_key=True)
title = Column(String(200), nullable=False)
content = Column(String(1000))
user_id = Column(Integer, ForeignKey("users.id"))
author = relationship("User", back_populates="posts")
```
## Database Initialization
```python
# init_db.py
import asyncio
from config import engine, Base
async def create_tables():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async def drop_tables():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
# Run initialization
if __name__ == "__main__":
asyncio.run(create_tables())
```
## CRUD Operations
```python
# crud.py
from sqlalchemy import select, update, delete
from sqlalchemy.orm import selectinload
from config import async_session
from models import User, Post
class UserCRUD:
@staticmethod
async def create_user(username: str, email: str):
async with async_session() as session:
user = User(username=username, email=email)
session.add(user)
await session.commit()
await session.refresh(user)
return user
@staticmethod
async def get_user(user_id: int):
async with async_session() as session:
stmt = select(User).where(User.id == user_id)
result = await session.execute(stmt)
return result.scalar_one_or_none()
@staticmethod
async def get_user_with_posts(user_id: int):
async with async_session() as session:
stmt = select(User).options(
selectinload(User.posts)
).where(User.id == user_id)
result = await session.execute(stmt)
return result.scalar_one_or_none()
@staticmethod
async def list_users(limit: int = 10, offset: int = 0):
async with async_session() as session:
stmt = select(User).limit(limit).offset(offset)
result = await session.execute(stmt)
return result.scalars().all()
@staticmethod
async def update_user(user_id: int, **kwargs):
async with async_session() as session:
stmt = update(User).where(User.id == user_id).values(**kwargs)
await session.execute(stmt)
await session.commit()
@staticmethod
async def delete_user(user_id: int):
async with async_session() as session:
stmt = delete(User).where(User.id == user_id)
await session.execute(stmt)
await session.commit()
```
## Query Patterns
```python
# queries.py
from sqlalchemy import func, and_, or_
from config import async_session
from models import User, Post
async def complex_queries():
async with async_session() as session:
# Count users
count_stmt = select(func.count(User.id))
user_count = await session.scalar(count_stmt)
# Filter with multiple conditions
filter_stmt = select(User).where(
and_(
User.username.like('%admin%'),
User.created_at >= '2023-01-01'
)
)
filtered_users = await session.execute(filter_stmt)
# Join query
join_stmt = select(User, Post).join(Post.author).where(
Post.title.contains('SQLAlchemy')
)
user_posts = await session.execute(join_stmt)
# Subquery
subquery = select(func.count(Post.id)).where(
Post.user_id == User.id
).scalar_subquery()
users_with_post_count = select(
User.username,
subquery.label('post_count')
)
result = await session.execute(users_with_post_count)
return {
'total_users': user_count,
'filtered_users': filtered_users.scalars().all(),
'user_posts': user_posts.all(),
'post_counts': result.all()
}
```
## Transaction Management
```python
# transactions.py
from sqlalchemy.exc import SQLAlchemyError
from config import async_session
from models import User, Post
async def create_user_with_post(username: str, email: str, post_title: str):
async with async_session() as session:
try:
async with session.begin():
# Create user
user = User(username=username, email=email)
session.add(user)
await session.flush() # Get user.id without committing
# Create post
post = Post(title=post_title, user_id=user.id)
session.add(post)
# Commit happens automatically
return user.id
except SQLAlchemyError as e:
# Rollback happens automatically
print(f"Transaction failed: {e}")
raise
# Manual transaction control
async def manual_transaction():
async with async_session() as session:
try:
user = User(username="test", email="test@example.com")
session.add(user)
await session.commit()
# Separate transaction
await session.refresh(user)
user.username = "updated"
await session.commit()
except Exception as e:
await session.rollback()
raise
```
## FastAPI Integration
```python
# main.py
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from typing import List
from crud import UserCRUD
app = FastAPI()
class UserCreate(BaseModel):
username: str
email: str
class UserResponse(BaseModel):
id: int
username: str
email: str
class Config:
orm_mode = True
@app.post("/users/", response_model=UserResponse)
async def create_user(user: UserCreate):
try:
db_user = await UserCRUD.create_user(
username=user.username,
email=user.email
)
return db_user
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int):
user = await UserCRUD.get_user(user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
@app.get("/users/", response_model=List[UserResponse])
async def list_users(limit: int = 10, offset: int = 0):
return await UserCRUD.list_users(limit=limit, offset=offset)
```
## Decision Tree
```
Need database operations?
├── Simple CRUD operations?
│ ├── Single model → Use basic async session
│ └── Multiple models → Use transaction blocks
├── Complex queries?
│ ├── Joins needed → Use selectinload/joinedload
│ ├── Aggregations → Use func and scalar()
│ └── Raw SQL → Use text() with async execution
├── High concurrency?
│ ├── Connection pooling → Configure pool_size/max_overflow
│ ├── Session management → Use dependency injection
│ └── Background tasks → Use separate session per task
└── Performance critical?
├── Query optimization → Use explain, indexing
├── Lazy loading → Configure relationship loading
└── Caching → Implement query result caching
```
## Performance Optimization
```python
# optimization.py
from sqlalchemy import text
from sqlalchemy.orm import selectinload, joinedload
from config import async_session, engine
# Efficient bulk operations
async def bulk_insert_users(user_data: List[dict]):
async with async_session() as session:
session.add_all([User(**data) for data in user_data])
await session.commit()
# Raw SQL for performance
async def raw_query_example():
async with engine.begin() as conn:
result = await conn.execute(
text("SELECT * FROM users WHERE created_at > :date"),
{"date": "2023-01-01"}
)
return result.fetchall()
# Optimize N+1 queries
async def optimized_user_posts():
async with async_session() as session:
stmt = select(User).options(selectinload(User.posts))
result = await session.execute(stmt)
users = result.scalars().all()
# Now access posts without additional queries
for user in users:
print(f"{user.username}: {len(user.posts)} posts")
```
## Troubleshooting
### Common Errors and Fixes
**Error**: `RuntimeError: asyncio.run() cannot be called from a running event loop`
```python
# Wrong
asyncio.run(some_async_function())
# Correct - use await in async context
await some_async_function()
```
**Error**: `InvalidRequestError: Object is not bound to a Session`
```python
# Wrong - using object outside session
async with async_session() as session:
user = await session.get(User, 1)
# user is detached here
print(user.posts) # Error!
# Correct - access relationships within session or use expire_on_commit=False
async with async_session() as session:
user = await session.get(User, 1)
posts = user.posts # Access within session
```
**Error**: `AttributeError: 'AsyncSession' object has no attribute 'query'`
```python
# Wrong - using sync ORM patterns
session.query(User).all()
# Correct - use select() with execute()
stmt = select(User)
result = await session.execute(stmt)
users = result.scalars().all()
```
**Connection Pool Issues**:
```python
# Add connection pool monitoring
engine = create_async_engine(
DATABASE_URL,
pool_pre_ping=True, # Verify connections
pool_recycle=300, # Recycle every 5 minutes
echo_pool=True # Log pool events
)
```
curl -s https://skills.skynet.ceo/api/skills/sqlalchemy-async/skill.md