目 录CONTENT

文章目录

how to init session in sqlalchemy?

Administrator
2024-12-24 / 0 评论 / 0 点赞 / 2 阅读 / 6119 字

使用sessionmaker在同步和异步函数创建DB会话

会话工厂

通过sessionmaker,我们得到一个类,一个能产生session的工厂

在同一个线程中:

  1. scoped_session(bind=engine)的时候,返回的是同一个session对象,先在Registry里找找之前是否已经创建session了。

要是有,就把这个session返回

  1. sessionmaker(bind=engine)的时候, 返回的是不相同session对象

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from casbin_sqlalchemy_adapter import Adapter
from app.core.config import settings

# 创建数据库引擎
engine = create_engine(
    settings.SQLALCHEMY_DATABASE_URI,  # 数据库URI
    pool_pre_ping=True,  # 启用连接池的预ping功能
    pool_size=settings.SQLALCHEMY_POOL_SIZE,  # 连接池大小
    pool_recycle=settings.SQLALCHEMY_POOL_RECYCLE,  # 连接池回收时间
    pool_timeout=settings.SQLALCHEMY_POOL_TIMEOUT,  # 连接池超时时间
    max_overflow=settings.SQLALCHEMY_POOL_OVERFLOW,  # 连接池最大溢出
    connect_args=dict(
        options=(
            "-c idle_in_transaction_session_timeout="
            f"{settings.POSTGRES_IDLE_IN_TRANSACTION_SESSION_TIMEOUT}"  # 设置PostgreSQL的空闲事务超时
        )
    ),
)

# 创建Casbin适配器
adapter = Adapter(engine)

# 创建会话工厂
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
  • 创建数据库引擎

  • settings.SQLALCHEMY_DATABASE_URI: 数据库的连接URI。

  • pool_pre_ping=True: 启用连接池的预ping功能,确保连接有效。

  • pool_size, pool_recycle, pool_timeout, max_overflow: 配置连接池的大小、回收时间、超时时间和最大溢出。

  • connect_args: 传递给数据库连接的额外参数,这里设置了PostgreSQL的空闲事务超时。

  • 创建Casbin适配器 (Adapter):

  • Adapter(engine): 使用创建的数据库引擎初始化Casbin的SQLAlchemy适配器。

  • 创建会话工厂 (sessionmaker): SessionLocal: 创建一个会话工厂,用于生成数据库会话对象,autocommitautoflush都设置为False

生成函数

在 FastAPI 中,通常会将 get_db 函数作为依赖项注入到路由处理函数中,以便在处理请求时自动获取和关闭数据库会话

同步

def get_db() -> Generator:
    try:
        db = SessionLocal()  # 创建一个新的数据库会话
        yield db  # 生成数据库会话对象,供调用者使用,在 FastAPI 中,这通常用于依赖注入,使得每个请求都可以获得一个独立的数据库会话。
    finally:
        db.close()  # 确保会话在使用后关闭

from fastapi import Depends, FastAPI
from sqlalchemy.orm import Session

app = FastAPI()

@app.get("/items/{item_id}")
def read_item(item_id: int, db: Session = Depends(get_db)):
    # 使用 db 进行数据库操作
    # ...
    return {"item_id": item_id}
# 在这种情况下,db.close() 会在请求处理完成后自动执行

在 SQLAlchemy 中,db.close() 方法用于关闭当前的数据库会话,但它并不会直接关闭底层的物理数据库连接。相反,它会将连接返回到连接池中,以便后续的会话可以重用这些连接

异步

from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker

async_engine = create_async_engine(settings.SQLALCHEMY_DATABASE_URI)
AsyncSessionLocal = sessionmaker(
    bind=async_engine, class_=AsyncSession, expire_on_commit=False
)

async def get_async_db() -> AsyncSession:
    async with AsyncSessionLocal() as db:
        yield db

@app.get("/items/{item_id}")
async def read_item(item_id: int, db: AsyncSession = Depends(get_async_db)):
    # 使用 db 进行数据库操作
    # ...
    return {"item_id": item_id}

0
  1. 支付宝打赏

    qrcode alipay
  2. 微信打赏

    qrcode weixin

评论区