目 录CONTENT

文章目录

FastAPI快的原因:异步并发

Administrator
2024-08-14 / 2 评论 / 0 点赞 / 129 阅读 / 0 字

典型路径操作

from typing import Union

from fastapi import FastAPI

app = FastAPI()


@app.get("/")
async def read_root():
    return {"Hello": "World"}


@app.get("/items/{item_id}")
async def read_item(item_id: int, q: Union[str, None] = None):
    return {"item_id": item_id, "q": q}

Python语法异步请求

results = await some_library()

@app.get('/')
def results():
    results = some_library()
    return results
    
@app.get('/')
async def read_results():
    results = await some_library()
    return results
  1. 定义三方库some_library

  2. 实时访问某些三方库,可以简单的使用def就行,不用加上await(正在使用一个第三方库和某些组件(比如:数据库、API、文件系统...)进行通信,第三方库又不支持使用 await (目前大多数数据库三方库都是这样),这种情况你可以像平常那样使用 def 声明一个路径操作函数)

  3. 只能在被 async def 创建的函数内使用 await

是否使用async,FastAPI都将异步工作,以达到"Fast"的运行速度

举例

import asyncio

# 定义一个异步函数
async def hello_world():
    print('Hello, World!')
    # 模拟一些耗时的操作
    await asyncio.sleep(1)
    print('...and goodbye!')

# 主异步函数
async def main():
    print("Starting...")
    await hello_world()  # 调用并等待 hello_world 函数完成
    print("Finished!")

# 获取当前的事件循环并运行主异步函数
asyncio.run(main())

D:\Code\ctyun\venv\PythonLearning\Scripts\python.exe D:\Code\PythonLearning\src\async_await\test.py 
Starting...
Hello, World!
...and goodbye!
Finished!
import asyncio

async def hello_world():
    print('Hello, World!')
    await asyncio.sleep(3)
    print('...and goodbye!')

async def good_morning():
    print('Good morning!')
    await asyncio.sleep(2)
    print('...and have a great day!')

async def main():
    print("Starting...")
    # 使用 asyncio.gather 来并发执行两个任务
    await asyncio.gather(hello_world(), good_morning())
    print("Finished!")

# 运行 main 函数
asyncio.run(main())

D:\Code\ctyun\venv\PythonLearning\Scripts\python.exe D:\Code\PythonLearning\src\async_await\test.py 
Starting...
Hello, World!
Good morning!
...and have a great day!
...and goodbye!
Finished!

IO密集操作

  • 客户端通过网络发送数据;

  • 服务端通过网络发送数据;

  • 程序从磁盘读取文件内容;

  • 程序将文件内容写入磁盘;

  • 远程API操作;

  • 数据库操作;

  • 数据库查询返回结果;

这些操作主要阻塞在IO等待,所以又叫做IO密集型。

CPU密集型操作

CPU 密集型操作的常见示例是需要复杂的数学处理

  • 音频或**图像**处理;

  • 计算机视觉: 一幅图像由数百万像素组成,每个像素有3种颜色值,处理通常需要同时对这些像素进行计算;

  • 机器学习: 它通常需要大量的"矩阵"和"向量"乘法。想象一个包含数字的巨大电子表格,并同时将所有数字相乘;

  • 深度学习: 这是机器学习的一个子领域,同样适用。只是没有一个数字的电子表格可以相乘,而是一个庞大的数字集合,在很多情况下,你需要使用一个特殊的处理器来构建和使用这些模型。

无论是否轮流执行(并发),都需要相同的时间来完成,而你也会完成相同的工作量

异步任务

被称为"异步"的原因是因为计算机/程序不必与慢任务"同步",去等待任务完成的确切时刻,而在此期间不做任何事情直到能够获取任务结果才继续工作

作为一个"异步"系统,一旦完成,任务就可以排队等待一段时间(几微秒),等待计算机程序完成它要做的任何事情,然后回来获取结果并继续处理它们

并发和并行

服务器正在等待很多很多用户通过他们不太好的网络发送来的请求,这就是为什么使用异步对于 Web API 很有意义的原因

async & await的使用技术细节

事件循环机制:异步函数-✓,如果需要异步函数里有阻塞IO操作,请阅读此文档

# 在异步函数里使用同步阻塞性的函数
await asyncio.to_thread(blocking_io_operation)
# blocking_io_operation是同步函数
burgers = await get_burgers(2)
async def get_burgers(number: int):
    # Do some asynchronous stuff to create the burgers
    return burgers
  1. 要使 await 工作,它必须位于支持这种异步机制的函数内

  2. 使用的库可以使用 await 调用它,则需要使用 async def 创建路径操作函数

  3. 【路径操作函数】习惯于用普通的 def 定义普通的仅计算路径操作函数,以获得微小的性能增益(大约100纳秒),请注意,在 FastAPI 中,效果将完全相反。在这些情况下,最好使用 async def,除非路径操作函数内使用执行阻塞 I/O 的代码

  4. 依赖: 如果一个依赖是标准的 def 函数而不是 async def,它将被运行在外部线程池中

from typing import Union

from fastapi import Depends, FastAPI

app = FastAPI()


async def common_parameters(
    q: Union[str, None] = None, skip: int = 0, limit: int = 100
):
    return {"q": q, "skip": skip, "limit": limit}


@app.get("/items/")
async def read_items(commons: dict = Depends(common_parameters)):
    return commons


@app.get("/users/")
async def read_users(commons: dict = Depends(common_parameters)):
    return commons
  1. 拥有多个相互依赖的依赖以及子依赖 (作为函数的参数),它们中的一些可能是通过 async def 声明,也可能是通过 def 声明。它们仍然可以正常工作,这些通过 def 声明的函数将会在外部线程中调用(来自线程池),而不是"被等待"

  2. 创建函数时使用的是常规的 def 关键字,例如 def my_function(): ,那么当在程序的其他部分需要使用这个函数时,会按照代码编写的顺序直接执行这个函数,不会将其放入线程池中进行异步处理

  3. 如果这个函数通过 async def 声明,当在代码中调用时,应该使用 await 等待函数的结果

思考

  1. 数据库操作是IO阻塞,那么fastapi为什么在操作数据库时,路径函数使用async?

    1. 数据库操作本身通常是 I/O 阻塞的,但现代数据库驱动程序和 ORM 工具(如 SQLAlchemy、Tortoise-ORM 等)提供了异步版本,可以与 FastAPI 的异步特性协同工作[我们目前crud定义的都是同步性数据库操作]

    2. 使用create_async_engine创建了一个异步引擎,并使用sessionmaker创建了一个异步会话

import asyncio
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Column, Integer, String, select
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    age = Column(Integer)

DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"
engine = create_async_engine(DATABASE_URL, echo=True)
async_session = sessionmaker(
    engine, expire_on_commit=False, class_=AsyncSession
)

async def create_tables():
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

async def create_user(name: str, age: int):
    async with async_session() as session:
        user = User(name=name, age=age)
        session.add(user)
        await session.commit()
        await session.refresh(user)
        return user

async def get_user(user_id: int):
    async with async_session() as session:
        result = await session.execute(select(User).where(User.id == user_id))
        return result.scalars().first()

async def update_user(user_id: int, name: str, age: int):
    async with async_session() as session:
        user = await session.get(User, user_id)
        if user:
            user.name = name
            user.age = age
            await session.commit()
            await session.refresh(user)
            return user

async def delete_user(user_id: int):
    async with async_session() as session:
        user = await session.get(User, user_id)
        if user:
            await session.delete(user)
            await session.commit()
            return True

async def main():
    await create_tables()
    new_user = await create_user("Alice", 30
  1. 路径函数是普通def,但是路径函数的代码里使用了SQLAlchemy操作数据库,SQLAlchemy是异步操作,会导致什么影响呢路径函数是普通的 def 定义而非 async def,但在其中使用了 SQLAlchemy 的异步 API 来操作数据库,这将会导致一些问题。主要的问题是同步路径函数无法直接等待异步操作完成

  • 同步路径函数不能直接调用异步函数,因为同步函数没有异步上下文。尝试这样做会导致RuntimeError,需要使用asyncio.run或其他异步执行机制

import asyncio

# 异步函数
async def async_function():
    print("异步函数开始")
    await asyncio.sleep(1)  # 模拟异步操作
    print("异步函数结束")

# 同步路径函数
def sync_path_function():
    print("同步路径函数开始")
    # 不能直接调用异步函数,需要使用 asyncio.run
    asyncio.run(async_function())
    print("同步路径函数结束")

# 运行同步路径函数
sync_path_function()
  1. 路径函数是异步,但是内部引用的函数是普通def,会有什么问题?

直接在异步函数中调用同步函数(如果同步函数有阻塞操作)会导致事件循环被阻塞,因为同步函数会等待直到完成才返回控制权给事件循环。这意味着在等待同步函数完成期间,事件循环无法处理其他异步任务

同步函数(使用普通def定义的函数)并不在异步上下文中执行,它们在调用它们的线程中同步执行

import asyncio

# 同步函数
def sync_function():
    print("同步函数开始")
    # 模拟阻塞操作
    for i in range(5):
        print(f"同步操作 {i}")
    print("同步函数结束")

# 异步路径函数
async def async_path_function():
    print("异步路径函数开始")
    sync_function()  # 调用同步函数
    print("异步路径函数结束")

# 运行异步路径函数
asyncio.run(async_path_function())
  1. FastAPI会对路径操作函数(path operation function)和依赖(dependencies)进行特殊处理。这个特殊处理是:如果你把函数定义为def而不是async def,那么FastAPI会把它放到单独的线程池中,异步执行,这就是FastAPI精彩的地方。就像官方所说,如果你不清楚你函数里面的调用是不是异步(能不能用await),那么就把它定义为普通函数,FastAPI会采用多线程的方式处理。乱用async,在async里面有同步调用,则会变成串行,Fast秒变Slow” -- https://www.cnblogs.com/df888/p/16890685.html

0
  1. 支付宝打赏

    qrcode alipay
  2. 微信打赏

    qrcode weixin

评论区