典型路径操作
from typing import Union
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
async def read_root():
return {"Hello": "World"}
@app.get("/items/{item_id}")
async def read_item(item_id: int, q: Union[str, None] = None):
return {"item_id": item_id, "q": q}
Python语法异步请求
results = await some_library()
@app.get('/')
def results():
results = some_library()
return results
@app.get('/')
async def read_results():
results = await some_library()
return results
定义三方库some_library
实时访问某些三方库,可以简单的使用def就行,不用加上await(正在使用一个第三方库和某些组件(比如:数据库、API、文件系统...)进行通信,第三方库又不支持使用 await (目前大多数数据库三方库都是这样),这种情况你可以像平常那样使用 def 声明一个路径操作函数)
只能在被
async def
创建的函数内使用await
是否使用async,FastAPI都将异步工作,以达到"Fast"的运行速度
举例
import asyncio
# 定义一个异步函数
async def hello_world():
print('Hello, World!')
# 模拟一些耗时的操作
await asyncio.sleep(1)
print('...and goodbye!')
# 主异步函数
async def main():
print("Starting...")
await hello_world() # 调用并等待 hello_world 函数完成
print("Finished!")
# 获取当前的事件循环并运行主异步函数
asyncio.run(main())
D:\Code\ctyun\venv\PythonLearning\Scripts\python.exe D:\Code\PythonLearning\src\async_await\test.py
Starting...
Hello, World!
...and goodbye!
Finished!
import asyncio
async def hello_world():
print('Hello, World!')
await asyncio.sleep(3)
print('...and goodbye!')
async def good_morning():
print('Good morning!')
await asyncio.sleep(2)
print('...and have a great day!')
async def main():
print("Starting...")
# 使用 asyncio.gather 来并发执行两个任务
await asyncio.gather(hello_world(), good_morning())
print("Finished!")
# 运行 main 函数
asyncio.run(main())
D:\Code\ctyun\venv\PythonLearning\Scripts\python.exe D:\Code\PythonLearning\src\async_await\test.py
Starting...
Hello, World!
Good morning!
...and have a great day!
...and goodbye!
Finished!
IO密集操作
客户端通过网络发送数据;
服务端通过网络发送数据;
程序从磁盘读取文件内容;
程序将文件内容写入磁盘;
远程API操作;
数据库操作;
数据库查询返回结果;
这些操作主要阻塞在IO等待,所以又叫做IO密集型。
CPU密集型操作
CPU 密集型操作的常见示例是需要复杂的数学处理
音频或**图像**处理;
计算机视觉: 一幅图像由数百万像素组成,每个像素有3种颜色值,处理通常需要同时对这些像素进行计算;
机器学习: 它通常需要大量的"矩阵"和"向量"乘法。想象一个包含数字的巨大电子表格,并同时将所有数字相乘;
深度学习: 这是机器学习的一个子领域,同样适用。只是没有一个数字的电子表格可以相乘,而是一个庞大的数字集合,在很多情况下,你需要使用一个特殊的处理器来构建和使用这些模型。
无论是否轮流执行(并发),都需要相同的时间来完成,而你也会完成相同的工作量
异步任务
被称为"异步"的原因是因为计算机/程序不必与慢任务"同步",去等待任务完成的确切时刻,而在此期间不做任何事情直到能够获取任务结果才继续工作
作为一个"异步"系统,一旦完成,任务就可以排队等待一段时间(几微秒),等待计算机程序完成它要做的任何事情,然后回来获取结果并继续处理它们
并发和并行
服务器正在等待很多很多用户通过他们不太好的网络发送来的请求,这就是为什么使用异步对于 Web API 很有意义的原因
async & await的使用技术细节
事件循环机制:异步函数-✓,如果需要异步函数里有阻塞IO操作,请阅读此文档
# 在异步函数里使用同步阻塞性的函数
await asyncio.to_thread(blocking_io_operation)
# blocking_io_operation是同步函数
burgers = await get_burgers(2)
async def get_burgers(number: int):
# Do some asynchronous stuff to create the burgers
return burgers
要使
await
工作,它必须位于支持这种异步机制的函数内使用的库可以使用
await
调用它,则需要使用async def
创建路径操作函数【路径操作函数】习惯于用普通的
def
定义普通的仅计算路径操作函数,以获得微小的性能增益(大约100纳秒),请注意,在 FastAPI 中,效果将完全相反。在这些情况下,最好使用async def
,除非路径操作函数内使用执行阻塞 I/O 的代码依赖: 如果一个依赖是标准的
def
函数而不是async def
,它将被运行在外部线程池中
from typing import Union
from fastapi import Depends, FastAPI
app = FastAPI()
async def common_parameters(
q: Union[str, None] = None, skip: int = 0, limit: int = 100
):
return {"q": q, "skip": skip, "limit": limit}
@app.get("/items/")
async def read_items(commons: dict = Depends(common_parameters)):
return commons
@app.get("/users/")
async def read_users(commons: dict = Depends(common_parameters)):
return commons
拥有多个相互依赖的依赖以及子依赖 (作为函数的参数),它们中的一些可能是通过
async def
声明,也可能是通过def
声明。它们仍然可以正常工作,这些通过def
声明的函数将会在外部线程中调用(来自线程池),而不是"被等待"创建函数时使用的是常规的 def 关键字,例如 def my_function(): ,那么当在程序的其他部分需要使用这个函数时,会按照代码编写的顺序直接执行这个函数,不会将其放入线程池中进行异步处理
如果这个函数通过
async def
声明,当在代码中调用时,应该使用await
等待函数的结果
思考
数据库操作是IO阻塞,那么fastapi为什么在操作数据库时,路径函数使用async?
数据库操作本身通常是 I/O 阻塞的,但现代数据库驱动程序和 ORM 工具(如 SQLAlchemy、Tortoise-ORM 等)提供了异步版本,可以与 FastAPI 的异步特性协同工作[我们目前crud定义的都是同步性数据库操作]
使用
create_async_engine
创建了一个异步引擎,并使用sessionmaker
创建了一个异步会话
import asyncio
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Column, Integer, String, select
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String)
age = Column(Integer)
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"
engine = create_async_engine(DATABASE_URL, echo=True)
async_session = sessionmaker(
engine, expire_on_commit=False, class_=AsyncSession
)
async def create_tables():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async def create_user(name: str, age: int):
async with async_session() as session:
user = User(name=name, age=age)
session.add(user)
await session.commit()
await session.refresh(user)
return user
async def get_user(user_id: int):
async with async_session() as session:
result = await session.execute(select(User).where(User.id == user_id))
return result.scalars().first()
async def update_user(user_id: int, name: str, age: int):
async with async_session() as session:
user = await session.get(User, user_id)
if user:
user.name = name
user.age = age
await session.commit()
await session.refresh(user)
return user
async def delete_user(user_id: int):
async with async_session() as session:
user = await session.get(User, user_id)
if user:
await session.delete(user)
await session.commit()
return True
async def main():
await create_tables()
new_user = await create_user("Alice", 30
路径函数是普通def,但是路径函数的代码里使用了SQLAlchemy操作数据库,SQLAlchemy是异步操作,会导致什么影响呢?路径函数是普通的
def
定义而非async def
,但在其中使用了 SQLAlchemy 的异步 API 来操作数据库,这将会导致一些问题。主要的问题是同步路径函数无法直接等待异步操作完成
同步路径函数不能直接调用异步函数,因为同步函数没有异步上下文。尝试这样做会导致RuntimeError,需要使用
asyncio.run
或其他异步执行机制
import asyncio
# 异步函数
async def async_function():
print("异步函数开始")
await asyncio.sleep(1) # 模拟异步操作
print("异步函数结束")
# 同步路径函数
def sync_path_function():
print("同步路径函数开始")
# 不能直接调用异步函数,需要使用 asyncio.run
asyncio.run(async_function())
print("同步路径函数结束")
# 运行同步路径函数
sync_path_function()
路径函数是异步,但是内部引用的函数是普通def,会有什么问题?
直接在异步函数中调用同步函数(如果同步函数有阻塞操作)会导致事件循环被阻塞,因为同步函数会等待直到完成才返回控制权给事件循环。这意味着在等待同步函数完成期间,事件循环无法处理其他异步任务
同步函数(使用普通def
定义的函数)并不在异步上下文中执行,它们在调用它们的线程中同步执行
import asyncio
# 同步函数
def sync_function():
print("同步函数开始")
# 模拟阻塞操作
for i in range(5):
print(f"同步操作 {i}")
print("同步函数结束")
# 异步路径函数
async def async_path_function():
print("异步路径函数开始")
sync_function() # 调用同步函数
print("异步路径函数结束")
# 运行异步路径函数
asyncio.run(async_path_function())
“FastAPI会对路径操作函数(path operation function)和依赖(dependencies)进行特殊处理。这个特殊处理是:如果你把函数定义为def而不是async def,那么FastAPI会把它放到单独的线程池中,异步执行,这就是FastAPI精彩的地方。就像官方所说,如果你不清楚你函数里面的调用是不是异步(能不能用await),那么就把它定义为普通函数,FastAPI会采用多线程的方式处理。乱用async,在async里面有同步调用,则会变成串行,Fast秒变Slow” -- https://www.cnblogs.com/df888/p/16890685.html
评论区