使用sessionmaker在同步和异步函数创建DB会话
会话工厂
通过sessionmaker,我们得到一个类,一个能产生session的工厂
在同一个线程中:
scoped_session(bind=engine)的时候,返回的是同一个session对象,先在Registry里找找之前是否已经创建session了。
要是有,就把这个session返回
sessionmaker(bind=engine)的时候, 返回的是不相同session对象
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from casbin_sqlalchemy_adapter import Adapter
from app.core.config import settings
# 创建数据库引擎
engine = create_engine(
settings.SQLALCHEMY_DATABASE_URI, # 数据库URI
pool_pre_ping=True, # 启用连接池的预ping功能
pool_size=settings.SQLALCHEMY_POOL_SIZE, # 连接池大小
pool_recycle=settings.SQLALCHEMY_POOL_RECYCLE, # 连接池回收时间
pool_timeout=settings.SQLALCHEMY_POOL_TIMEOUT, # 连接池超时时间
max_overflow=settings.SQLALCHEMY_POOL_OVERFLOW, # 连接池最大溢出
connect_args=dict(
options=(
"-c idle_in_transaction_session_timeout="
f"{settings.POSTGRES_IDLE_IN_TRANSACTION_SESSION_TIMEOUT}" # 设置PostgreSQL的空闲事务超时
)
),
)
# 创建Casbin适配器
adapter = Adapter(engine)
# 创建会话工厂
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
创建数据库引擎
settings.SQLALCHEMY_DATABASE_URI: 数据库的连接URI。
pool_pre_ping=True: 启用连接池的预ping功能,确保连接有效。
pool_size, pool_recycle, pool_timeout, max_overflow: 配置连接池的大小、回收时间、超时时间和最大溢出。
connect_args: 传递给数据库连接的额外参数,这里设置了PostgreSQL的空闲事务超时。
创建Casbin适配器 (Adapter):
Adapter(engine): 使用创建的数据库引擎初始化Casbin的SQLAlchemy适配器。
创建会话工厂 (sessionmaker): SessionLocal: 创建一个会话工厂,用于生成数据库会话对象,autocommit和autoflush都设置为False。
生成函数
在 FastAPI 中,通常会将 get_db 函数作为依赖项注入到路由处理函数中,以便在处理请求时自动获取和关闭数据库会话
同步
def get_db() -> Generator:
try:
db = SessionLocal() # 创建一个新的数据库会话
yield db # 生成数据库会话对象,供调用者使用,在 FastAPI 中,这通常用于依赖注入,使得每个请求都可以获得一个独立的数据库会话。
finally:
db.close() # 确保会话在使用后关闭
from fastapi import Depends, FastAPI
from sqlalchemy.orm import Session
app = FastAPI()
@app.get("/items/{item_id}")
def read_item(item_id: int, db: Session = Depends(get_db)):
# 使用 db 进行数据库操作
# ...
return {"item_id": item_id}
# 在这种情况下,db.close() 会在请求处理完成后自动执行
在 SQLAlchemy 中,db.close() 方法用于关闭当前的数据库会话,但它并不会直接关闭底层的物理数据库连接。相反,它会将连接返回到连接池中,以便后续的会话可以重用这些连接
异步
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
async_engine = create_async_engine(settings.SQLALCHEMY_DATABASE_URI)
AsyncSessionLocal = sessionmaker(
bind=async_engine, class_=AsyncSession, expire_on_commit=False
)
async def get_async_db() -> AsyncSession:
async with AsyncSessionLocal() as db:
yield db
@app.get("/items/{item_id}")
async def read_item(item_id: int, db: AsyncSession = Depends(get_async_db)):
# 使用 db 进行数据库操作
# ...
return {"item_id": item_id}
评论区