目 录CONTENT

文章目录

Python之FastAPI的入门到精通系列:asyncpg异步连接管理数据库Postgres

Administrator
2025-11-05 / 0 评论 / 0 点赞 / 0 阅读 / 0 字

 

字数 2957,阅读大约需 15 分钟

Python之FastAPI的入门到精通系列:asyncpg异步连接管理数据库Postgres


代码地址:[email protected]:FunkyGod/fastapi-demo.git

asyncpg

今天演示的是异步连接DB, 在async里协程最好是使用异步库连接DB,避免阻碍事件循环:核心库是asyncpg
版本要求python 3.9+ , postgres in [9.5, 17]

1M rows/s from Postgres to Python — magicstack[1]
主要围绕异步使用数据库展开介绍,并包含以下6种测试验证:

  1. 1. 直接 asyncpg 连接: ✅ 通过

  2. 2. SQLAlchemy 异步引擎连接: ✅ 通过

  3. 3. 异步会话功能: ✅ 通过

  4. 4. 连接池功能: ✅ 通过

  5. 5. 异步依赖注入函数: ✅ 通过

  6. 6. 测试get_async_db_session函数: ✅ 通过

官方用法

asyncpg 是一个为 Python 的 asyncio 框架设计的、功能强大的 PostgreSQL 数据库客户端/驱动库。它的核心目标是提供一个与 PostgreSQL 数据库进行高性能异步交互的接口。


    
    
    
  import asyncio
import asyncpg

async def run():
    conn = await asyncpg.connect(user='user', password='password',
                                 database='database', host='127.0.0.1')
    values = await conn.fetch(
        'SELECT * FROM mytable WHERE id = $1',
        10,
    )
    await conn.close()

asyncio.run(run())

异步引擎、线程池、上下文管理

在FastAPI 项目中引入 asyncpg 的好处:
充分发挥 FastAPI 的性能:FastAPI 是一个异步框架,如果在路由函数中执行同步的数据库查询(如使用 psycopg2),将会阻塞整个事件循环,导致并发性能下降。切换到 asyncpg 可以实现端到端的异步处理,从接收请求到数据库查询全程非阻塞,从而最大化应用的并发处理能力。

更高效的资源利用:在等待数据库 I/O 时,异步驱动不会占用线程资源。在高并发场景下,这意味着可以用更少的进程和内存来服务更多的用户请求,降低了服务器成本。

SQLAlchemy 2.0 无缝集成:SQLAlchemy 2.0 的架构原生支持异步操作。通过简单的配置变更,就可以让现有的 SQLAlchemy 模型和查询语句在 asyncpg 上运行,而无需大规模重构代码。

1. 创建异步引擎
2. 异步session工厂
3. 异步上下文管理

创建异步数据库引擎


    
    
    
  # 创建异步数据库引擎
# 引擎是 SQLAlchemy 异步应用的起点,它管理着与数据库的连接池和方言。
async_engine: AsyncEngine = create_async_engine(
    # 数据库连接字符串,从配置中读取
    str(settings.SQLALCHEMY_DATABASE_URI).replace("postgresql://", "postgresql+asyncpg://"),
    # 启用连接池预检测,每次从连接池获取连接时,会发送一个简单的查询以检查连接是否有效
    pool_pre_ping=True,
    # 自定义的 JSON 序列化函数
    json_serializer=_custom_json_serializer,
    # 连接池大小
    pool_size=settings.SQLALCHEMY_POOL_SIZE,
    # 连接回收时间(秒),-1 表示不回收
    pool_recycle=settings.SQLALCHEMY_POOL_RECYCLE,
    # 获取连接的超时时间(秒)
    pool_timeout=settings.SQLALCHEMY_POOL_TIMEOUT,
    # 连接池中允许超过 pool_size 的最大连接数
    max_overflow=settings.SQLALCHEMY_POOL_OVERFLOW,
    # 传递给数据库驱动的额外参数
    connect_args=dict(
        server_settings={
            # 设置 PostgreSQL 的事务中空闲会话的超时时间
            "jit": "off",
            "idle_in_transaction_session_timeout":
            f"{settings.POSTGRES_IDLE_IN_TRANSACTION_SESSION_TIMEOUT}",
        }
    ),
)

创建异步 Session 工厂

用于后续创建独立的数据库会话实例


    
    
    
  # autocommit=False 和 autoflush=False 是 SQLAlchemy 推荐的默认设置,
# 这样可以手动控制事务的提交和刷新。
AsyncSessionLocal = async_sessionmaker(
    bind=async_engine, 
    class_=AsyncSession, 
    autocommit=False, 
    autoflush=False,
    expire_on_commit=False
)

异步数据库会话上下文


    
    
    
  @asynccontextmanager
async def get_async_session():
    """
    创建一个异步数据库会话的上下文管理器。

    用法:
        async with get_async_session() as db:
            result = await db.execute(...)

    该方法利用 @asynccontextmanager 装饰器和 try...finally 结构,
    确保数据库会话在使用完毕后,无论是否发生异常,
    其 close() 方法都能被调用,从而安全地关闭和回收连接资源。
    这是一种推荐的数据库会话管理模式,可以有效避免资源泄露。
    """
    async with AsyncSessionLocal() as session:
        try:
            yield session
        finally:
            # 此处的 close() 方法并不会直接关闭数据库的物理连接,
            # 而是将本次会话(Session)所使用的连接释放,归还到连接池(Connection Pool)中。
            # 这样做是为了复用连接,避免了频繁创建和销毁数据库连接带来的性能开销。
            await session.close()

测试异步数据库连接

  1. 1. 直接 asyncpg 连接: ✅ 通过

  2. 2. SQLAlchemy 异步引擎连接: ✅ 通过

  3. 3. 异步会话功能: ✅ 通过

  4. 4. 连接池功能: ✅ 通过

  5. 5. 异步依赖注入函数: ✅ 通过

  6. 6. 测试get_async_db_session函数: ✅ 通过

测试函数


    
    
    
  async def test_async_connection() -> bool:
    """
    测试异步数据库连接是否成功
    """
    try:
        conn = await asyncpg.connect(
            host=settings.POSTGRES_SERVER,
            user=settings.POSTGRES_USER,
            password=settings.POSTGRES_PASSWORD,
            database=settings.POSTGRES_DB,
        )
        try:
            # 执行一个简单的查询来测试连接
            result = await conn.fetchval("SELECT 1")
            return result == 1
        finally:
            await conn.close()
    except Exception as e:
        print(f"异步数据库连接测试失败: {e}")
        return False


async def test_async_engine_connection() -> bool:
    """
    使用 SQLAlchemy 异步引擎测试数据库连接
    """
    try:
        async with async_engine.begin() as conn:
            result = await conn.execute(text("SELECT 1"))
            return result.scalar() == 1
    except Exception as e:
        print(f"异步引擎连接测试失败: {e}")
        return False


# 创建一个全局的异步会话依赖注入函数
async def get_async_db_session() -> AsyncGenerator[AsyncSession, None]:
    """
    FastAPI 依赖注入函数 - 获取异步数据库会话
    在 FastAPI 路由函数中使用:
    
    @app.get("/users/")
    async def get_users(db: AsyncSession = Depends(get_async_db_session)):
        # 使用 db 进行数据库操作
        pass
    """
    async with AsyncSessionLocal() as session:
        try:
            yield session
        finally:
            await session.close()

测试用例


    
    
    
  #!/usr/bin/env python3
"""
异步数据库连接测试脚本

该脚本用于测试 asyncpg 异步数据库连接是否正常工作。
"""

import asyncio
import os

# 添加项目根目录到 Python 路径
import sys

sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from app.core.config import settings
from app.db.async_db import test_async_connection, test_async_engine_connection


async def test_direct_asyncpg_connection():
    """测试直接使用 asyncpg 的连接"""
    print("=" * 50)
    print("测试 1: 直接使用 asyncpg 连接")
    print("=" * 50)

    print(f"数据库配置:")
    print(f"  POSTGRES_SERVER: {settings.POSTGRES_SERVER}")
    print(f"  POSTGRES_USER: {settings.POSTGRES_USER}")
    print(f"  POSTGRES_DB: {settings.POSTGRES_DB}")
    print()

    try:
        success = await test_async_connection()
        if success:
            print("✅ 直接 asyncpg 连接测试成功!")
            return True
        else:
            print("❌ 直接 asyncpg 连接测试失败!")
            return False
    except Exception as e:
        print(f"❌ 直接 asyncpg 连接测试异常: {e}")
        return False


async def test_async_engine_connection_test():
    """测试 SQLAlchemy 异步引擎连接"""
    print("\n" + "=" * 50)
    print("测试 2: SQLAlchemy 异步引擎连接")
    print("=" * 50)

    try:
        success = await test_async_engine_connection()
        if success:
            print("✅ SQLAlchemy 异步引擎连接测试成功!")
            return True
        else:
            print("❌ SQLAlchemy 异步引擎连接测试失败!")
            return False
    except Exception as e:
        print(f"❌ SQLAlchemy 异步引擎连接测试异常: {e}")
        return False


async def test_async_session():
    """测试异步会话功能"""
    print("\n" + "=" * 50)
    print("测试 3: 异步会话功能")
    print("=" * 50)

    try:
        from app.db.async_db import get_async_session, AsyncSessionLocal
        from sqlalchemy import text

        # 使用上下文管理器测试
        async with get_async_session() as session:
            # 执行测试查询
            result = await session.execute(text("SELECT version()"))
            version = result.scalar()
            print(f"✅ PostgreSQL 版本: {version}")

            # 测试事务
            await session.execute(text("SELECT 1"))
            await session.commit()
            print("✅ 事务测试成功!")

            return True

    except Exception as e:
        print(f"❌ 异步会话测试异常: {e}")
        return False


async def test_connection_pool():
    """测试连接池功能"""
    print("\n" + "=" * 50)
    print("测试 4: 连接池功能")
    print("=" * 50)

    try:
        from app.db.async_db import AsyncSessionLocal, async_engine

        # 检查连接池配置
        pool_size = async_engine.pool.size()
        checked_in = async_engine.pool.checkedin()
        checked_out = async_engine.pool.checkedout()
        max_size = getattr(async_engine.pool, "maxsize", "N/A")

        print(f"连接池状态:")
        print(f"  当前连接数: {pool_size}")
        print(f"  已归还连接: {checked_in}")
        print(f"  已借出连接: {checked_out}")
        print(f"  最大连接数: {max_size}")
        print()

        # 测试多个并发连接
        from sqlalchemy import text

        async def single_connection_test():
            async with AsyncSessionLocal() as session:
                await session.execute(text("SELECT 1"))
                await asyncio.sleep(0.1)  # 模拟一些工作
                return "成功"

        # 并发测试
        tasks = [single_connection_test() for _ in range(10)]
        results = await asyncio.gather(*tasks, return_exceptions=True)

        success_count = sum(1 for r in results if r == "成功")
        print(f"✅ 并发连接测试: {success_count}/{len(tasks)} 成功")

        return success_count == len(tasks)

    except Exception as e:
        print(f"❌ 连接池测试异常: {e}")
        return False


async def test_async_dependency_injection():
    """测试异步依赖注入函数"""
    print("\n" + "=" * 50)
    print("测试 5: 异步依赖注入函数")
    print("=" * 50)
    
    try:
        from app.api.deps import get_async_db
        from sqlalchemy import text
        
        # 测试 get_async_db 函数是否工作正常
        async for session in get_async_db():
            try:
                # 执行测试查询
                result = await session.execute(text("SELECT current_database(), current_user"))
                db_name, user = result.fetchone()
                print(f"✅ 依赖注入测试成功 - 数据库: {db_name}, 用户: {user}")
                
                # 测试事务
                await session.execute(text("SELECT 1"))
                print("✅ 依赖注入事务测试成功")
                
                return True
            except Exception as e:
                print(f"❌ 依赖注入测试失败: {e}")
                return False
    except Exception as e:
        print(f"❌ 依赖注入函数测试异常: {e}")
        return False


async def test_get_async_db_session_function():
    """测试 get_async_db_session 函数"""
    print("\n" + "=" * 50)
    print("测试 6: get_async_db_session 函数")
    print("=" * 50)
    
    try:
        from app.db.async_db import get_async_db_session
        from sqlalchemy import text
        
        # 测试 get_async_db_session 函数是否工作正常
        async for session in get_async_db_session():
            try:
                # 执行测试查询
                result = await session.execute(text("SELECT current_database(), current_user, version()"))
                db_name, user, version = result.fetchone()
                print(f"✅ get_async_db_session 测试成功")
                print(f"   数据库: {db_name}")
                print(f"   用户: {user}")
                print(f"   版本: {version.split(',')[0]}")  # 只显示版本号部分
                
                # 测试事务
                await session.execute(text("SELECT 1"))
                print("✅ 事务测试成功")
                
                return True
            except Exception as e:
                print(f"❌ get_async_db_session 测试失败: {e}")
                return False
    except Exception as e:
        print(f"❌ get_async_db_session 函数测试异常: {e}")
        return False


async def main():
    """主测试函数"""
    print("🚀 开始异步数据库连接测试...")
    print(f"📋 项目: {settings.PROJECT_NAME}")
    print()

    # 检查环境变量
    required_vars = ["POSTGRES_SERVER", "POSTGRES_USER", "POSTGRES_PASSWORD", "POSTGRES_DB"]
    missing_vars = [var for var in required_vars if not getattr(settings, var, None)]

    if missing_vars:
        print(f"❌ 缺少必要的环境变量: {missing_vars}")
        print("请确保在 .env 文件中配置这些变量")
        return

    # 运行所有测试
    tests = [
        ("直接 asyncpg 连接", test_direct_asyncpg_connection),
        ("SQLAlchemy 异步引擎连接", test_async_engine_connection_test),
        ("异步会话功能", test_async_session),
        ("连接池功能", test_connection_pool),
        ("异步依赖注入函数", test_async_dependency_injection),
        ("测试get_async_db_session函数", test_get_async_db_session_function),
    ]

    results = []
    for test_name, test_func in tests:
        try:
            result = await test_func()
            results.append((test_name, result))
        except Exception as e:
            print(f"❌ {test_name} 测试异常: {e}")
            results.append((test_name, False))

    # 汇总结果
    print("\n" + "=" * 50)
    print("📊 测试结果汇总")
    print("=" * 50)

    success_count = 0
    for test_name, result in results:
        status = "✅ 通过" if result else "❌ 失败"
        print(f"{test_name}: {status}")
        if result:
            success_count += 1

    print(f"\n总计: {success_count}/{len(results)} 测试通过")

    if success_count == len(results):
        print("🎉 所有测试通过! 异步数据库连接配置成功!")
        return True
    else:
        print("⚠️  部分测试失败,请检查配置")
        return False


if __name__ == "__main__":
    # 运行测试
    success = asyncio.run(main())
    sys.exit(0 if success else 1)

执行结果


    
    
    
  🚀 开始异步数据库连接测试...
📋 项目: fastapi

==================================================
测试 1: 直接使用 asyncpg 连接
==================================================
数据库配置:
  POSTGRES_SERVER: db
  POSTGRES_USER: service
  POSTGRES_DB: fastapi_demo_db

✅ 直接 asyncpg 连接测试成功!

==================================================
测试 2: SQLAlchemy 异步引擎连接
==================================================
✅ SQLAlchemy 异步引擎连接测试成功!

==================================================
测试 3: 异步会话功能
==================================================
✅ PostgreSQL 版本: PostgreSQL 12.22 (Debian 12.22-1.pgdg120+1) on x86_64-pc-linux-gnu, compiled by gcc (Debian 12.2.0-14) 12.2.0, 64-bit
✅ 事务测试成功!

==================================================
测试 4: 连接池功能
==================================================
连接池状态:
  当前连接数: 60
  已归还连接: 1
  已借出连接: 0
  最大连接数: N/A

✅ 并发连接测试: 10/10 成功

==================================================
测试 5: 异步依赖注入函数
==================================================
✅ 依赖注入测试成功 - 数据库: fastapi_demo_db, 用户: service
✅ 依赖注入事务测试成功

==================================================
测试 6: get_async_db_session 函数
==================================================
✅ get_async_db_session 测试成功
   数据库: fastapi_demo_db
   用户: service
   版本: PostgreSQL 12.22 (Debian 12.22-1.pgdg120+1) on x86_64-pc-linux-gnu
✅ 事务测试成功

==================================================
📊 测试结果汇总
==================================================
直接 asyncpg 连接: ✅ 通过
SQLAlchemy 异步引擎连接: ✅ 通过
异步会话功能: ✅ 通过
连接池功能: ✅ 通过
异步依赖注入函数: ✅ 通过
测试get_async_db_session函数: ✅ 通过

总计: 6/6 测试通过
🎉 所有测试通过! 异步数据库连接配置成功!

结尾彩蛋

刚刷到的朋友注意啦!点击【关注】锁定宝藏库,从此升职加薪不迷路 ✨
若觉得内容有用,长按点赞3秒引爆爱心特效!你的每次互动,都是我深夜码字的星光 🌟

【三步操作,终身受益】
✅ 点击「关注」→ 持续收获成长能量
✅ 点亮「点赞」→ 为干货内容打call
✅ 设为「星标」⭐️→ 算法优先推送,更新不错过


广告时刻

📢 云资源限时福利
有云服务器、CDN、对象存储、网络防护等需求的朋友,欢迎联系下方腾讯云官方销售 👇
✔️ 内部专属折扣,价格更优
✔️ 量大可谈,支持定制方案
✔️ 技术咨询与售后无忧

引用链接

[1] 1M rows/s from Postgres to Python — magicstack: http://magic.io/blog/asyncpg-1m-rows-from-postgres-to-python/

 

0
  1. 支付宝打赏

    qrcode alipay
  2. 微信打赏

    qrcode weixin

评论区