目 录CONTENT

文章目录

Python AsyncIO:学习和使用指南

Administrator
2025-10-17 / 0 评论 / 0 点赞 / 0 阅读 / 0 字

 

字数 2380,阅读大约需 12 分钟

Python AsyncIO:学习和使用指南


通过 async/await 语法来声明 协程 是编写 asyncio 应用的推荐方式。掌握asyncio编程,是提高Python执行效率的必备语法!异步IO是一种并发编程模型,并非并行编程。

asyncio

asyncio 是用来编写并发代码的库,使用async/await 语法。 asyncio 被用作多个提供高性能Python 异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等。

  1. 1. 单线程并发: asyncio 允许你在一个线程中同时处理多个任务,避免了多线程或多进程带来的开销和复杂性

  2. 2. 异步I/O: 它支持非阻塞I/O,当程序等待I/O 操作完成时,不会阻塞主线程,而是可以去执行其他任务,从而显著提高效率

  3. 3. 事件循环: 事件循环是 asyncio 的核心,它负责调度和运行异步任务,并在任务准备就绪时执行它们

  4. 4. 协程: asyncio 是基于协程的,async 和 await 关键字允许函数暂停和恢复执行,

文档

Python协程实现 | 李乾坤的博客[1]

协程与任务 — Python 3.13.9 文档[2]

Python's asyncio: A Hands-On Walkthrough – Real Python[3]

异步编程

传统的同步编程是阻塞的,一个任务必须完全完成后才能开始下一个任务。
异步编程是非阻塞的,当一个任务需要等待(例如等待网络响应)时,程序可以切换到执行另一个任务,而不是闲置等待。当等待的任务完成后,程序再回来继续执行它。

  1. 1. 协程 (Coroutines):协程是 asyncio 的核心。它们是特殊的函数,使用 async def 关键字定义。协程可以暂停执行,并在稍后从暂停的地方恢复。使用 await 关键字来等待另一个协程或可等待对象(如 asyncio.sleep())。当一个协程 await 另一个操作时,它会交出控制权给事件循环,允许事件循环执行其他准备好的任务。

  2. 2. 事件循环 (Event Loop):事件循环是 asyncio 的核心调度器。它负责管理和执行协程。事件循环会不断地检查是否有任务准备好运行,并调度它们执行。当一个协程暂停时,事件循环会选择下一个准备好的协程来运行。asyncio.run() 是启动事件循环并运行主协程的入口点。

  3. 3. (Tasks):asyncio.Task 是协程的包装器,它允许协程在事件循环中独立运行。你可以使用 asyncio.create_task() 将一个协程包装成一个任务,然后事件循环会调度这个任务。

常见异步编程框架

高质量的第三方库和框架支持asyncio或基于其构建,其中包括用于Web服务器、数据库、网络、测试等的工具。以下是一些最值得注意的:

Web框架:

  1. 1. FastAPI:用于构建Web API的现代异步Web框架。

  2. 2. Starlette:轻量级异步服务器网关接口(ASGI)框架,用于构建高性能异步Web应用。

  3. 3. Sanic:使用asyncio构建的、以速度为目标的异步Web框架。

  4. 4. Quart:异步Web微框架,具有与Flask相同的API。

  5. 5. 龙卷风(Tornado):高性能的Web框架和异步网络库。

ASGI服务器:

  1. 1. uvicorn:快速的ASGI Web服务器。

  2. 2. Hypercorn:支持多种协议和配置选项的ASGI服务器。

网络工具:

  1. 1. aiohttp:使用asyncio实现的HTTP客户端和服务器。

  2. 2. HTTPX:功能齐全的异步和同步HTTP客户端。

  3. 3. websockets:使用asyncio构建WebSocket服务器和客户端的库。

  4. 4. aiosmtplib:用于发送电子邮件的异步SMTP客户端。

数据库工具:

  1. 1. 数据库:与SQLAlchemy核心兼容的异步数据库访问层。

  2. 2. Tortoise ORM:轻量级异步对象关系映射器(ORM)。

  3. 3. Gino:基于SQLAlchemy核心为PostgreSQL构建的异步ORM。

  4. 4. Motor:基于asyncio构建的异步MongoDB驱动程序。

实用工具库:

  1. 1. aiofiles:对Python的文件API进行包装,以便与async和await配合使用。

  2. 2. aiocache:支持Redis和Memcached的异步缓存库。

  3. 3. APScheduler:一个支持异步作业的任务调度器。

  4. 4. pytest-asyncio:添加了使用pytest测试异步函数的支持。

常用代码


    
    
    
  import asyncio
import time

# 在 asyncio 中,asyncio.Task 是协程的包装器,用于在事件循环中调度和运行协程。
# 它允许协程并发执行,是实现并发的核心组件。
# 通常通过 asyncio.create_task() 来创建 Task 实例。

async def cancelable_task(name: str, delay: int):
    """
    一个可以被取消的异步任务。
    """
    try:
        print(f"[{time.strftime('%H:%M:%S')}] {name} (可取消) 开始等待 {delay} 秒...")
        await asyncio.sleep(delay)
        print(f"[{time.strftime('%H:%M:%S')}] {name} (可取消) 等待结束!")
        return f"{name} (可取消) 完成"
    except asyncio.CancelledError:
        print(f"[{time.strftime('%H:%M:%S')}] {name} (可取消) 被取消了!")
        return f"{name} (可取消) 被取消"

async def say_hello(name: str, delay: int):
    """
    异步函数,模拟一个耗时操作。
    当 await asyncio.sleep(delay) 时,当前协程会暂停,将控制权交还给事件循环,
    允许其他协程运行。
    """
    try:
        print(f"[{time.strftime('%H:%M:%S')}] {name} 开始等待 {delay} 秒...")
        await asyncio.sleep(delay)  # 暂停执行,将控制权交给事件循环
        print(f"[{time.strftime('%H:%M:%S')}] {name} 等待结束!")
        return f"{name} 完成" # 返回任务结果
    except Exception as e:
        print(f"[{time.strftime('%H:%M:%S')}] {name} 任务发生异常: {e}")
        raise # 重新抛出异常,以便 gather 或 wait_for 捕获

async def main():
    """
    主协程,创建并运行多个任务。
    """
    print(f"[{time.strftime('%H:%M:%S')}] 主程序开始")

    # 使用 asyncio.create_task() 创建多个任务,它们将并发运行
    task1 = asyncio.create_task(say_hello("Alice", 3))
    task2 = asyncio.create_task(say_hello("Bob", 1))
    task3 = asyncio.create_task(say_hello("Charlie", 2))

    # 使用 asyncio.gather() 等待所有任务完成,并收集它们的结果
    # 使用 asyncio.gather() 等待所有任务完成,并收集它们的结果
    # asyncio.gather 会并发运行所有任务,并等待它们全部完成。
    # 返回值是所有任务的结果列表,顺序与传入任务的顺序一致。
    results = await asyncio.gather(task1, task2, task3)
    print(f"[{time.strftime('%H:%M:%S')}] 所有任务完成,结果: {results}")

    # 演示任务取消
    print(f"\n[{time.strftime('%H:%M:%S')}] 演示任务取消")
    cancel_task_instance = asyncio.create_task(cancelable_task("David", 5))
    await asyncio.sleep(1) # 等待一小段时间,让任务开始执行
    cancel_task_instance.cancel() # 取消任务
    try:
        await cancel_task_instance
    except asyncio.CancelledError:
        print(f"[{time.strftime('%H:%M:%S')}] 捕获到 David 任务的 CancelledError")

    # 演示任务超时
    print(f"\n[{time.strftime('%H:%M:%S')}] 演示任务超时")
    timeout_task_instance = asyncio.create_task(say_hello("Eve", 4))
    try:
        # 设置超时时间为 2 秒,如果任务在 2 秒内未完成,则会抛出 asyncio.TimeoutError
        result = await asyncio.wait_for(timeout_task_instance, timeout=2)
        print(f"[{time.strftime('%H:%M:%S')}] Eve 任务在超时前完成,结果: {result}")
    except asyncio.TimeoutError:
        print(f"[{time.strftime('%H:%M:%S')}] Eve 任务超时了!")
        # 超时后任务仍然在运行,需要手动取消
        timeout_task_instance.cancel()
        try:
            await timeout_task_instance
        except asyncio.CancelledError:
            print(f"[{time.strftime('%H:%M:%S')}] Eve 任务被取消。")

    # 演示异常处理
    print(f"\n[{time.strftime('%H:%M:%S')}] 演示异常处理")
    async def raise_error_task():
        """一个会抛出异常的异步任务。"""
        await asyncio.sleep(1)
        raise ValueError("这是一个模拟的错误!")

    error_task = asyncio.create_task(raise_error_task())
    try:
        await error_task
    except ValueError as e:
        print(f"[{time.strftime('%H:%M:%S')}] 捕获到异常: {e}")

    # 使用 gather 捕获异常
    async def task_with_error(name: str, delay: int, should_raise: bool = False):
        """一个可能抛出异常的异步任务。"""
        try:
            print(f"[{time.strftime('%H:%M:%S')}] {name} 开始等待 {delay} 秒...")
            await asyncio.sleep(delay)
            if should_raise:
                raise RuntimeError(f"{name} 模拟运行时错误")
            print(f"[{time.strftime('%H:%M:%S')}] {name} 等待结束!")
            return f"{name} 完成"
        except Exception as e:
            print(f"[{time.strftime('%H:%M:%S')}] {name} 任务发生异常: {e}")
            raise # 重新抛出异常

    task_ok = asyncio.create_task(task_with_error("Frank", 1))
    task_err = asyncio.create_task(task_with_error("Grace", 2, should_raise=True))

    # 当 return_exceptions=True 时,gather 会收集所有结果和异常,而不是在第一个异常时停止。
    gather_results_with_exceptions = await asyncio.gather(task_ok, task_err, return_exceptions=True)
    print(f"[{time.strftime('%H:%M:%S')}] gather 结果 (包含异常): {gather_results_with_exceptions}")

    # 当 return_exceptions=False (默认) 时,gather 会在第一个异常发生时立即停止并抛出异常。
    print(f"\n[{time.strftime('%H:%M:%S')}] 演示 gather 遇到异常时停止 (默认行为)")
    task_ok_2 = asyncio.create_task(task_with_error("Heidi", 1))
    task_err_2 = asyncio.create_task(task_with_error("Ivan", 2, should_raise=True))
    task_ok_3 = asyncio.create_task(task_with_error("Judy", 3))

    try:
        await asyncio.gather(task_ok_2, task_err_2, task_ok_3)
    except RuntimeError as e:
        print(f"[{time.strftime('%H:%M:%S')}] 捕获到 gather 抛出的异常: {e}")
        # 此时,其他未完成的任务可能仍在运行,需要手动取消
        for task in [task_ok_2, task_err_2, task_ok_3]:
            if not task.done():
                task.cancel()
                try:
                    await task
                except asyncio.CancelledError:
                    print(f"[{time.strftime('%H:%M:%S')}] 未完成任务 {task.get_name()} 被取消。")

    print(f"[{time.strftime('%H:%M:%S')}] 主程序结束")

if __name__ == "__main__":
    # asyncio.run() 是运行 asyncio 应用程序的推荐入口点。
    # 它负责启动事件循环,运行顶层协程,并在协程完成后关闭事件循环。
    asyncio.run(main())

执行结果


    
    
    
  [10:47:43] 主程序开始
[10:47:43] Alice 开始等待 3 秒...
[10:47:43] Bob 开始等待 1 秒...
[10:47:43] Charlie 开始等待 2 秒...
[10:47:44] Bob 等待结束!
[10:47:45] Charlie 等待结束!
[10:47:46] Alice 等待结束!
[10:47:46] 所有任务完成,结果: ['Alice 完成', 'Bob 完成', 'Charlie 完成']

[10:47:46] 演示任务取消
[10:47:46] David (可取消) 开始等待 5 秒...
[10:47:47] David (可取消) 被取消了!

[10:47:47] 演示任务超时
[10:47:47] Eve 开始等待 4 秒...
[10:47:49] Eve 任务超时了!
[10:47:49] Eve 任务被取消。

[10:47:49] 演示异常处理
[10:47:50] 捕获到异常: 这是一个模拟的错误!
[10:47:50] Frank 开始等待 1 秒...
[10:47:50] Grace 开始等待 2 秒...
[10:47:51] Frank 等待结束!
[10:47:52] Grace 任务发生异常: Grace 模拟运行时错误
[10:47:52] gather 结果 (包含异常): ['Frank 完成', RuntimeError('Grace 模拟运行时错误')]

[10:47:52] 演示 gather 遇到异常时停止 (默认行为)
[10:47:52] Heidi 开始等待 1 秒...
[10:47:52] Ivan 开始等待 2 秒...
[10:47:52] Judy 开始等待 3 秒...
[10:47:53] Heidi 等待结束!
[10:47:54] Ivan 任务发生异常: Ivan 模拟运行时错误
[10:47:54] 捕获到 gather 抛出的异常: Ivan 模拟运行时错误
[10:47:54] 未完成任务 Task-12 被取消。
[10:47:54] 主程序结束

进程已结束,退出代码为 0

引用链接

[1] Python协程实现 | 李乾坤的博客: https://qiankunli.github.io/2024/07/19/python_asyncio.html
[2] 协程与任务 — Python 3.13.9 文档: https://docs.python.org/zh-cn/3.13/library/asyncio-task.html
[3] Python's asyncio: A Hands-On Walkthrough – Real Python: https://realpython.com/async-io-python/

 

0
  1. 支付宝打赏

    qrcode alipay
  2. 微信打赏

    qrcode weixin

评论区