字数 2957,阅读大约需 15 分钟
Python之FastAPI的入门到精通系列:asyncpg异步连接管理数据库Postgres

代码地址:[email protected]:FunkyGod/fastapi-demo.git
asyncpg
今天演示的是异步连接DB, 在async里协程最好是使用异步库连接DB,避免阻碍事件循环:核心库是asyncpg
版本要求python 3.9+ , postgres in [9.5, 17]

1M rows/s from Postgres to Python — magicstack[1]
主要围绕异步使用数据库展开介绍,并包含以下6种测试验证:
1. 直接 asyncpg 连接: ✅ 通过
2. SQLAlchemy 异步引擎连接: ✅ 通过
3. 异步会话功能: ✅ 通过
4. 连接池功能: ✅ 通过
5. 异步依赖注入函数: ✅ 通过
6. 测试get_async_db_session函数: ✅ 通过
官方用法
asyncpg 是一个为 Python 的 asyncio 框架设计的、功能强大的 PostgreSQL 数据库客户端/驱动库。它的核心目标是提供一个与 PostgreSQL 数据库进行高性能异步交互的接口。
import asyncio
import asyncpg
async def run():
conn = await asyncpg.connect(user='user', password='password',
database='database', host='127.0.0.1')
values = await conn.fetch(
'SELECT * FROM mytable WHERE id = $1',
10,
)
await conn.close()
asyncio.run(run())异步引擎、线程池、上下文管理
在FastAPI 项目中引入 asyncpg 的好处:
充分发挥 FastAPI 的性能:FastAPI 是一个异步框架,如果在路由函数中执行同步的数据库查询(如使用 psycopg2),将会阻塞整个事件循环,导致并发性能下降。切换到 asyncpg 可以实现端到端的异步处理,从接收请求到数据库查询全程非阻塞,从而最大化应用的并发处理能力。
更高效的资源利用:在等待数据库 I/O 时,异步驱动不会占用线程资源。在高并发场景下,这意味着可以用更少的进程和内存来服务更多的用户请求,降低了服务器成本。
与 SQLAlchemy 2.0 无缝集成:SQLAlchemy 2.0 的架构原生支持异步操作。通过简单的配置变更,就可以让现有的 SQLAlchemy 模型和查询语句在 asyncpg 上运行,而无需大规模重构代码。
1. 创建异步引擎
2. 异步session工厂
3. 异步上下文管理
创建异步数据库引擎
# 创建异步数据库引擎
# 引擎是 SQLAlchemy 异步应用的起点,它管理着与数据库的连接池和方言。
async_engine: AsyncEngine = create_async_engine(
# 数据库连接字符串,从配置中读取
str(settings.SQLALCHEMY_DATABASE_URI).replace("postgresql://", "postgresql+asyncpg://"),
# 启用连接池预检测,每次从连接池获取连接时,会发送一个简单的查询以检查连接是否有效
pool_pre_ping=True,
# 自定义的 JSON 序列化函数
json_serializer=_custom_json_serializer,
# 连接池大小
pool_size=settings.SQLALCHEMY_POOL_SIZE,
# 连接回收时间(秒),-1 表示不回收
pool_recycle=settings.SQLALCHEMY_POOL_RECYCLE,
# 获取连接的超时时间(秒)
pool_timeout=settings.SQLALCHEMY_POOL_TIMEOUT,
# 连接池中允许超过 pool_size 的最大连接数
max_overflow=settings.SQLALCHEMY_POOL_OVERFLOW,
# 传递给数据库驱动的额外参数
connect_args=dict(
server_settings={
# 设置 PostgreSQL 的事务中空闲会话的超时时间
"jit": "off",
"idle_in_transaction_session_timeout":
f"{settings.POSTGRES_IDLE_IN_TRANSACTION_SESSION_TIMEOUT}",
}
),
)创建异步 Session 工厂
用于后续创建独立的数据库会话实例
# autocommit=False 和 autoflush=False 是 SQLAlchemy 推荐的默认设置,
# 这样可以手动控制事务的提交和刷新。
AsyncSessionLocal = async_sessionmaker(
bind=async_engine,
class_=AsyncSession,
autocommit=False,
autoflush=False,
expire_on_commit=False
)异步数据库会话上下文
@asynccontextmanager
async def get_async_session():
"""
创建一个异步数据库会话的上下文管理器。
用法:
async with get_async_session() as db:
result = await db.execute(...)
该方法利用 @asynccontextmanager 装饰器和 try...finally 结构,
确保数据库会话在使用完毕后,无论是否发生异常,
其 close() 方法都能被调用,从而安全地关闭和回收连接资源。
这是一种推荐的数据库会话管理模式,可以有效避免资源泄露。
"""
async with AsyncSessionLocal() as session:
try:
yield session
finally:
# 此处的 close() 方法并不会直接关闭数据库的物理连接,
# 而是将本次会话(Session)所使用的连接释放,归还到连接池(Connection Pool)中。
# 这样做是为了复用连接,避免了频繁创建和销毁数据库连接带来的性能开销。
await session.close()测试异步数据库连接
1. 直接 asyncpg 连接: ✅ 通过
2. SQLAlchemy 异步引擎连接: ✅ 通过
3. 异步会话功能: ✅ 通过
4. 连接池功能: ✅ 通过
5. 异步依赖注入函数: ✅ 通过
6. 测试get_async_db_session函数: ✅ 通过
测试函数
async def test_async_connection() -> bool:
"""
测试异步数据库连接是否成功
"""
try:
conn = await asyncpg.connect(
host=settings.POSTGRES_SERVER,
user=settings.POSTGRES_USER,
password=settings.POSTGRES_PASSWORD,
database=settings.POSTGRES_DB,
)
try:
# 执行一个简单的查询来测试连接
result = await conn.fetchval("SELECT 1")
return result == 1
finally:
await conn.close()
except Exception as e:
print(f"异步数据库连接测试失败: {e}")
return False
async def test_async_engine_connection() -> bool:
"""
使用 SQLAlchemy 异步引擎测试数据库连接
"""
try:
async with async_engine.begin() as conn:
result = await conn.execute(text("SELECT 1"))
return result.scalar() == 1
except Exception as e:
print(f"异步引擎连接测试失败: {e}")
return False
# 创建一个全局的异步会话依赖注入函数
async def get_async_db_session() -> AsyncGenerator[AsyncSession, None]:
"""
FastAPI 依赖注入函数 - 获取异步数据库会话
在 FastAPI 路由函数中使用:
@app.get("/users/")
async def get_users(db: AsyncSession = Depends(get_async_db_session)):
# 使用 db 进行数据库操作
pass
"""
async with AsyncSessionLocal() as session:
try:
yield session
finally:
await session.close()测试用例
#!/usr/bin/env python3
"""
异步数据库连接测试脚本
该脚本用于测试 asyncpg 异步数据库连接是否正常工作。
"""
import asyncio
import os
# 添加项目根目录到 Python 路径
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from app.core.config import settings
from app.db.async_db import test_async_connection, test_async_engine_connection
async def test_direct_asyncpg_connection():
"""测试直接使用 asyncpg 的连接"""
print("=" * 50)
print("测试 1: 直接使用 asyncpg 连接")
print("=" * 50)
print(f"数据库配置:")
print(f" POSTGRES_SERVER: {settings.POSTGRES_SERVER}")
print(f" POSTGRES_USER: {settings.POSTGRES_USER}")
print(f" POSTGRES_DB: {settings.POSTGRES_DB}")
print()
try:
success = await test_async_connection()
if success:
print("✅ 直接 asyncpg 连接测试成功!")
return True
else:
print("❌ 直接 asyncpg 连接测试失败!")
return False
except Exception as e:
print(f"❌ 直接 asyncpg 连接测试异常: {e}")
return False
async def test_async_engine_connection_test():
"""测试 SQLAlchemy 异步引擎连接"""
print("\n" + "=" * 50)
print("测试 2: SQLAlchemy 异步引擎连接")
print("=" * 50)
try:
success = await test_async_engine_connection()
if success:
print("✅ SQLAlchemy 异步引擎连接测试成功!")
return True
else:
print("❌ SQLAlchemy 异步引擎连接测试失败!")
return False
except Exception as e:
print(f"❌ SQLAlchemy 异步引擎连接测试异常: {e}")
return False
async def test_async_session():
"""测试异步会话功能"""
print("\n" + "=" * 50)
print("测试 3: 异步会话功能")
print("=" * 50)
try:
from app.db.async_db import get_async_session, AsyncSessionLocal
from sqlalchemy import text
# 使用上下文管理器测试
async with get_async_session() as session:
# 执行测试查询
result = await session.execute(text("SELECT version()"))
version = result.scalar()
print(f"✅ PostgreSQL 版本: {version}")
# 测试事务
await session.execute(text("SELECT 1"))
await session.commit()
print("✅ 事务测试成功!")
return True
except Exception as e:
print(f"❌ 异步会话测试异常: {e}")
return False
async def test_connection_pool():
"""测试连接池功能"""
print("\n" + "=" * 50)
print("测试 4: 连接池功能")
print("=" * 50)
try:
from app.db.async_db import AsyncSessionLocal, async_engine
# 检查连接池配置
pool_size = async_engine.pool.size()
checked_in = async_engine.pool.checkedin()
checked_out = async_engine.pool.checkedout()
max_size = getattr(async_engine.pool, "maxsize", "N/A")
print(f"连接池状态:")
print(f" 当前连接数: {pool_size}")
print(f" 已归还连接: {checked_in}")
print(f" 已借出连接: {checked_out}")
print(f" 最大连接数: {max_size}")
print()
# 测试多个并发连接
from sqlalchemy import text
async def single_connection_test():
async with AsyncSessionLocal() as session:
await session.execute(text("SELECT 1"))
await asyncio.sleep(0.1) # 模拟一些工作
return "成功"
# 并发测试
tasks = [single_connection_test() for _ in range(10)]
results = await asyncio.gather(*tasks, return_exceptions=True)
success_count = sum(1 for r in results if r == "成功")
print(f"✅ 并发连接测试: {success_count}/{len(tasks)} 成功")
return success_count == len(tasks)
except Exception as e:
print(f"❌ 连接池测试异常: {e}")
return False
async def test_async_dependency_injection():
"""测试异步依赖注入函数"""
print("\n" + "=" * 50)
print("测试 5: 异步依赖注入函数")
print("=" * 50)
try:
from app.api.deps import get_async_db
from sqlalchemy import text
# 测试 get_async_db 函数是否工作正常
async for session in get_async_db():
try:
# 执行测试查询
result = await session.execute(text("SELECT current_database(), current_user"))
db_name, user = result.fetchone()
print(f"✅ 依赖注入测试成功 - 数据库: {db_name}, 用户: {user}")
# 测试事务
await session.execute(text("SELECT 1"))
print("✅ 依赖注入事务测试成功")
return True
except Exception as e:
print(f"❌ 依赖注入测试失败: {e}")
return False
except Exception as e:
print(f"❌ 依赖注入函数测试异常: {e}")
return False
async def test_get_async_db_session_function():
"""测试 get_async_db_session 函数"""
print("\n" + "=" * 50)
print("测试 6: get_async_db_session 函数")
print("=" * 50)
try:
from app.db.async_db import get_async_db_session
from sqlalchemy import text
# 测试 get_async_db_session 函数是否工作正常
async for session in get_async_db_session():
try:
# 执行测试查询
result = await session.execute(text("SELECT current_database(), current_user, version()"))
db_name, user, version = result.fetchone()
print(f"✅ get_async_db_session 测试成功")
print(f" 数据库: {db_name}")
print(f" 用户: {user}")
print(f" 版本: {version.split(',')[0]}") # 只显示版本号部分
# 测试事务
await session.execute(text("SELECT 1"))
print("✅ 事务测试成功")
return True
except Exception as e:
print(f"❌ get_async_db_session 测试失败: {e}")
return False
except Exception as e:
print(f"❌ get_async_db_session 函数测试异常: {e}")
return False
async def main():
"""主测试函数"""
print("🚀 开始异步数据库连接测试...")
print(f"📋 项目: {settings.PROJECT_NAME}")
print()
# 检查环境变量
required_vars = ["POSTGRES_SERVER", "POSTGRES_USER", "POSTGRES_PASSWORD", "POSTGRES_DB"]
missing_vars = [var for var in required_vars if not getattr(settings, var, None)]
if missing_vars:
print(f"❌ 缺少必要的环境变量: {missing_vars}")
print("请确保在 .env 文件中配置这些变量")
return
# 运行所有测试
tests = [
("直接 asyncpg 连接", test_direct_asyncpg_connection),
("SQLAlchemy 异步引擎连接", test_async_engine_connection_test),
("异步会话功能", test_async_session),
("连接池功能", test_connection_pool),
("异步依赖注入函数", test_async_dependency_injection),
("测试get_async_db_session函数", test_get_async_db_session_function),
]
results = []
for test_name, test_func in tests:
try:
result = await test_func()
results.append((test_name, result))
except Exception as e:
print(f"❌ {test_name} 测试异常: {e}")
results.append((test_name, False))
# 汇总结果
print("\n" + "=" * 50)
print("📊 测试结果汇总")
print("=" * 50)
success_count = 0
for test_name, result in results:
status = "✅ 通过" if result else "❌ 失败"
print(f"{test_name}: {status}")
if result:
success_count += 1
print(f"\n总计: {success_count}/{len(results)} 测试通过")
if success_count == len(results):
print("🎉 所有测试通过! 异步数据库连接配置成功!")
return True
else:
print("⚠️ 部分测试失败,请检查配置")
return False
if __name__ == "__main__":
# 运行测试
success = asyncio.run(main())
sys.exit(0 if success else 1)
执行结果
🚀 开始异步数据库连接测试...
📋 项目: fastapi
==================================================
测试 1: 直接使用 asyncpg 连接
==================================================
数据库配置:
POSTGRES_SERVER: db
POSTGRES_USER: service
POSTGRES_DB: fastapi_demo_db
✅ 直接 asyncpg 连接测试成功!
==================================================
测试 2: SQLAlchemy 异步引擎连接
==================================================
✅ SQLAlchemy 异步引擎连接测试成功!
==================================================
测试 3: 异步会话功能
==================================================
✅ PostgreSQL 版本: PostgreSQL 12.22 (Debian 12.22-1.pgdg120+1) on x86_64-pc-linux-gnu, compiled by gcc (Debian 12.2.0-14) 12.2.0, 64-bit
✅ 事务测试成功!
==================================================
测试 4: 连接池功能
==================================================
连接池状态:
当前连接数: 60
已归还连接: 1
已借出连接: 0
最大连接数: N/A
✅ 并发连接测试: 10/10 成功
==================================================
测试 5: 异步依赖注入函数
==================================================
✅ 依赖注入测试成功 - 数据库: fastapi_demo_db, 用户: service
✅ 依赖注入事务测试成功
==================================================
测试 6: get_async_db_session 函数
==================================================
✅ get_async_db_session 测试成功
数据库: fastapi_demo_db
用户: service
版本: PostgreSQL 12.22 (Debian 12.22-1.pgdg120+1) on x86_64-pc-linux-gnu
✅ 事务测试成功
==================================================
📊 测试结果汇总
==================================================
直接 asyncpg 连接: ✅ 通过
SQLAlchemy 异步引擎连接: ✅ 通过
异步会话功能: ✅ 通过
连接池功能: ✅ 通过
异步依赖注入函数: ✅ 通过
测试get_async_db_session函数: ✅ 通过
总计: 6/6 测试通过
🎉 所有测试通过! 异步数据库连接配置成功!结尾彩蛋
刚刷到的朋友注意啦!点击【关注】锁定宝藏库,从此升职加薪不迷路 ✨
若觉得内容有用,长按点赞3秒引爆爱心特效!你的每次互动,都是我深夜码字的星光 🌟
【三步操作,终身受益】
✅ 点击「关注」→ 持续收获成长能量
✅ 点亮「点赞」→ 为干货内容打call
✅ 设为「星标」⭐️→ 算法优先推送,更新不错过
广告时刻
📢 云资源限时福利
有云服务器、CDN、对象存储、网络防护等需求的朋友,欢迎联系下方腾讯云官方销售 👇
✔️ 内部专属折扣,价格更优
✔️ 量大可谈,支持定制方案
✔️ 技术咨询与售后无忧

引用链接
[1] 1M rows/s from Postgres to Python — magicstack: http://magic.io/blog/asyncpg-1m-rows-from-postgres-to-python/
评论区