一句话总结
用 SQLAlchemy 2.0 AsyncIO 模式,把 FastAPI 的并发优势兑现成 真正的数据库吞吐;再叠上连接池、事务、迁移、测试四件套,直接上线不踩坑。
1. 为什么要“异步 ORM”?
| 场景 | 同步 SQLAlchemy | 异步 SQLAlchemy |
|---|---|---|
| 100 个并发上传 | 开 100 线程 → 100 个连接 → DB 被打爆 | 单线程 20 连接即可跑满 CPU |
| 请求等待 I/O | 线程上下文切换 8 ms | 协程切换 0.3 ms |
| 代码风格 | 到处 run_in_threadpool | 原生 await 一路到底 |
一句话:同步模式把 FastAPI 的异步事件循环拖回解放前。
2. 最小可运行版本(MVP)
安装依赖
1pip install "fastapi[all]" \ 2 "sqlalchemy[asyncio]>=2.0" \ 3 asyncpg alembic pydantic[email] 4
数据库以 PostgreSQL 为例,MySQL 换成
asyncmy即可。
项目骨架
1app/ 2 ├─ api/ 3 │ └─ user.py 4 ├─ core/ 5 │ ├─ db.py 6 │ └─ config.py 7 ├─ models/ 8 │ └─ user.py 9 ├─ schemas/ 10 │ └─ user.py 11 └─ main.py 12
3. 核心代码:Session 生命周期一条龙
app/core/config.py
1from pydantic import BaseSettings 2 3class Settings(BaseSettings): 4 database_url: str = "postgresql+asyncpg://user:pass@localhost:5432/demo" 5 pool_size: int = 20 6 max_overflow: int = 0 7 echo_sql: bool = False 8 9 class Config: 10 env_file = ".env" 11 12settings = Settings() 13
app/core/db.py
1from sqlalchemy.ext.asyncio import ( 2 AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine 3) 4 5class AsyncDatabaseSession: 6 def __init__(self, url: str, *, pool_size: int = 20, max_overflow: int = 0, echo: bool = False): 7 self.engine: AsyncEngine = create_async_engine( 8 url, 9 pool_size=pool_size, 10 max_overflow=max_overflow, 11 echo=echo, 12 pool_pre_ping=True, # 心跳保活 13 ) 14 self.session_factory = async_sessionmaker( 15 self.engine, 16 expire_on_commit=False, # 防止懒加载异常 17 class_=AsyncSession, 18 ) 19 20 async def close(self): 21 await self.engine.dispose() 22 23db = AsyncDatabaseSession( 24 settings.database_url, 25 pool_size=settings.pool_size, 26 echo=settings.echo_sql, 27) 28
main.py
1from fastapi import FastAPI 2from app.core.db import db 3from app.api import user 4 5app = FastAPI(title="Async SQLAlchemy Demo") 6 7app.include_router(user.router) 8 9@app.on_event("startup") 10async def startup(): 11 # 可选:建表 12 # from app.models import Base 13 # async with db.engine.begin() as conn: 14 # await conn.run_sync(Base.metadata.create_all) 15 pass 16 17@app.on_event("shutdown") 18async def shutdown(): 19 await db.close() 20
4. 依赖注入:每次请求一个 Session,自动回滚
app/core/deps.py
1from typing import AsyncGenerator 2from app.core.db import db 3from sqlalchemy.ext.asyncio import AsyncSession 4from fastapi import Depends 5 6async def get_session() -> AsyncGenerator[AsyncSession, None]: 7 async with db.session_factory() as session: 8 try: 9 yield session 10 except Exception: 11 await session.rollback() 12 raise 13 finally: 14 await session.close() 15
用
yield+rollback保证请求级事务;抛异常自动回滚,正常则 commit。
5. Model / Schema / CRUD 一条龙
app/models/user.py
1from sqlalchemy import String 2from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column 3 4class Base(DeclarativeBase): 5 pass 6 7class User(Base): 8 __tablename__ = "users" 9 10 id: Mapped[int] = mapped_column(primary_key=True, index=True) 11 email: Mapped[str] = mapped_column(String(320), unique=True, index=True) 12 full_name: Mapped[str | None] 13
app/schemas/user.py
1from pydantic import BaseModel, EmailStr 2 3class UserCreate(BaseModel): 4 email: EmailStr 5 full_name: str | None = None 6 7class UserRead(BaseModel): 8 id: int 9 email: EmailStr 10 full_name: str | None 11 12 class Config: 13 orm_mode = True 14
app/api/user.py
1from fastapi import APIRouter, Depends, HTTPException 2from sqlalchemy.ext.asyncio import AsyncSession 3from sqlalchemy import select 4from app.models import User 5from app.schemas import UserCreate, UserRead 6from app.core.deps import get_session 7 8router = APIRouter(prefix="/users", tags=["users"]) 9 10@router.post("", response_model=UserRead) 11async def create_user(payload: UserCreate, session: AsyncSession = Depends(get_session)): 12 user = User(**payload.dict()) 13 session.add(user) 14 await session.flush() # 获取 id 15 await session.commit() 16 await session.refresh(user) 17 return user 18 19@router.get("/{uid}", response_model=UserRead) 20async def read_user(uid: int, session: AsyncSession = Depends(get_session)): 21 user = await session.get(User, uid) 22 if not user: 23 raise HTTPException(404, "User not found") 24 return user 25
6. 迁移:Alembic 同样能异步
初始化
1alembic init -t async migrations 2
修改 alembic.ini 中的 sqlalchemy.url 为 postgresql+asyncpg://...
migrations/env.py
1from app.core.config import settings 2from app.models import Base 3target_metadata = Base.metadata 4 5def do_run_migrations(connection): 6 context.configure(connection=connection, target_metadata=target_metadata) 7 with context.begin_transaction(): 8 context.run_migrations() 9 10async def run_async_migrations(): 11 from sqlalchemy.ext.asyncio import AsyncEngine 12 connectable = AsyncEngine(create_async_engine(settings.database_url)) 13 async with connectable.connect() as connection: 14 await connection.run_sync(do_run_migrations) 15 await connectable.dispose() 16
生成 / 升级
1alembic revision --autogenerate -m "init" 2alembic upgrade head 3
7. 测试:pytest-asyncio + 异步数据库事务
tests/conftest.py
1import pytest 2from httpx import AsyncClient 3from app.main import app 4from app.core.db import db as db_instance 5from sqlalchemy.pool import StaticPool 6from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine 7 8@pytest.fixture(scope="session") 9async def engine() -> AsyncEngine: 10 # 内存 SQLite 也可以异步,但 PostgreSQL 更真实 11 engine = create_async_engine( 12 "postgresql+asyncpg://test:test@localhost:5432/test", 13 poolclass=StaticPool, 14 ) 15 yield engine 16 await engine.dispose() 17 18@pytest.fixture 19async def session(engine: AsyncEngine): 20 conn = await engine.begin() 21 sess = db_instance.session_factory(bind=conn) 22 yield sess 23 await sess.close() 24 await conn.rollback() 25 await conn.close() 26 27@pytest.fixture 28async def client() -> AsyncGenerator[AsyncClient, None]: 29 async with AsyncClient(app=app, base_url="http://test") as c: 30 yield c 31
tests/test_user.py
1import pytest 2from sqlalchemy import select 3from app.models import User 4 5@pytest.mark.asyncio 6async def test_create_user(client, session): 7 res = await client.post("/users", json={"email": "[email protected]", "full_name": "abc"}) 8 assert res.status_code == 201 9 data = res.json() 10 assert data["email"] == "[email protected]" 11 12 user = await session.get(User, data["id"]) 13 assert user is not None 14
8. 性能调优 checklist
| 参数 | 建议值 | 说明 |
|---|---|---|
| pool_size | CPU 核心 × 2 | 20 并发已能压到 10k RPS |
| max_overflow | 0 | 防止突发连接打爆 DB |
| pool_pre_ping=True | 必须 | 网络闪断后自动重连 |
| expire_on_commit=False | 必须 | 否则 commit 后属性失效 |
| echo=False | 生产关闭 | 减少序列化开销 |
9. 常见错误速查表
| 异常 | 原因 | 解法 |
|---|---|---|
| greenlet_spawn has not been called | 用了同步引擎 | create_async_engine |
| DetachedInstanceError | 会话关闭后访问属性 | expire_on_commit=False + await session.refresh() |
| InterfaceError: connection already closed | 协程间复用 Session | 一个请求一个 Session,禁止全局单例 |
| ImportError: asyncmy | MySQL 驱动未装 | pip install asyncmy |
10. 结语
FastAPI 的异步生态里,数据库是最后一道闸门。
用上 SQLAlchemy 2.0 AsyncIO 之后,I/O 等待不再是瓶颈,压测曲线直接多一个量级。
把本文的 db.py + deps.py 复制走,10 分钟就能让老项目原地起飞。Happy async coding!
《FastAPI × SQLAlchemy 2.0 Async:从“能跑”到“可压测”的完整工程实践》 是转载文章,点击查看原文。
