字数 2380,阅读大约需 12 分钟
Python AsyncIO:学习和使用指南
通过 async/await 语法来声明 协程 是编写 asyncio 应用的推荐方式。掌握asyncio编程,是提高Python执行效率的必备语法!异步IO是一种并发编程模型,并非并行编程。
asyncio
asyncio 是用来编写并发代码的库,使用async/await 语法。 asyncio 被用作多个提供高性能Python 异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等。
1. 单线程并发: asyncio 允许你在一个线程中同时处理多个任务,避免了多线程或多进程带来的开销和复杂性
2. 异步I/O: 它支持非阻塞I/O,当程序等待I/O 操作完成时,不会阻塞主线程,而是可以去执行其他任务,从而显著提高效率
3. 事件循环: 事件循环是 asyncio 的核心,它负责调度和运行异步任务,并在任务准备就绪时执行它们
4. 协程: asyncio 是基于协程的,async 和 await 关键字允许函数暂停和恢复执行,
文档
Python协程实现 | 李乾坤的博客[1]
协程与任务 — Python 3.13.9 文档[2]
Python's asyncio: A Hands-On Walkthrough – Real Python[3]
异步编程
传统的同步编程是阻塞的,一个任务必须完全完成后才能开始下一个任务。
异步编程是非阻塞的,当一个任务需要等待(例如等待网络响应)时,程序可以切换到执行另一个任务,而不是闲置等待。当等待的任务完成后,程序再回来继续执行它。
1. 协程 (Coroutines):协程是 asyncio 的核心。它们是特殊的函数,使用 async def 关键字定义。协程可以暂停执行,并在稍后从暂停的地方恢复。使用 await 关键字来等待另一个协程或可等待对象(如 asyncio.sleep())。当一个协程 await 另一个操作时,它会交出控制权给事件循环,允许事件循环执行其他准备好的任务。
2. 事件循环 (Event Loop):事件循环是 asyncio 的核心调度器。它负责管理和执行协程。事件循环会不断地检查是否有任务准备好运行,并调度它们执行。当一个协程暂停时,事件循环会选择下一个准备好的协程来运行。asyncio.run() 是启动事件循环并运行主协程的入口点。
3. (Tasks):asyncio.Task 是协程的包装器,它允许协程在事件循环中独立运行。你可以使用 asyncio.create_task() 将一个协程包装成一个任务,然后事件循环会调度这个任务。
常见异步编程框架
高质量的第三方库和框架支持asyncio或基于其构建,其中包括用于Web服务器、数据库、网络、测试等的工具。以下是一些最值得注意的:
Web框架:
1. FastAPI:用于构建Web API的现代异步Web框架。
2. Starlette:轻量级异步服务器网关接口(ASGI)框架,用于构建高性能异步Web应用。
3. Sanic:使用asyncio构建的、以速度为目标的异步Web框架。
4. Quart:异步Web微框架,具有与Flask相同的API。
5. 龙卷风(Tornado):高性能的Web框架和异步网络库。
ASGI服务器:
1. uvicorn:快速的ASGI Web服务器。
2. Hypercorn:支持多种协议和配置选项的ASGI服务器。
网络工具:
1. aiohttp:使用asyncio实现的HTTP客户端和服务器。
2. HTTPX:功能齐全的异步和同步HTTP客户端。
3. websockets:使用asyncio构建WebSocket服务器和客户端的库。
4. aiosmtplib:用于发送电子邮件的异步SMTP客户端。
数据库工具:
1. 数据库:与SQLAlchemy核心兼容的异步数据库访问层。
2. Tortoise ORM:轻量级异步对象关系映射器(ORM)。
3. Gino:基于SQLAlchemy核心为PostgreSQL构建的异步ORM。
4. Motor:基于asyncio构建的异步MongoDB驱动程序。
实用工具库:
1. aiofiles:对Python的文件API进行包装,以便与async和await配合使用。
2. aiocache:支持Redis和Memcached的异步缓存库。
3. APScheduler:一个支持异步作业的任务调度器。
4. pytest-asyncio:添加了使用pytest测试异步函数的支持。
常用代码
import asyncio
import time
# 在 asyncio 中,asyncio.Task 是协程的包装器,用于在事件循环中调度和运行协程。
# 它允许协程并发执行,是实现并发的核心组件。
# 通常通过 asyncio.create_task() 来创建 Task 实例。
async def cancelable_task(name: str, delay: int):
"""
一个可以被取消的异步任务。
"""
try:
print(f"[{time.strftime('%H:%M:%S')}] {name} (可取消) 开始等待 {delay} 秒...")
await asyncio.sleep(delay)
print(f"[{time.strftime('%H:%M:%S')}] {name} (可取消) 等待结束!")
return f"{name} (可取消) 完成"
except asyncio.CancelledError:
print(f"[{time.strftime('%H:%M:%S')}] {name} (可取消) 被取消了!")
return f"{name} (可取消) 被取消"
async def say_hello(name: str, delay: int):
"""
异步函数,模拟一个耗时操作。
当 await asyncio.sleep(delay) 时,当前协程会暂停,将控制权交还给事件循环,
允许其他协程运行。
"""
try:
print(f"[{time.strftime('%H:%M:%S')}] {name} 开始等待 {delay} 秒...")
await asyncio.sleep(delay) # 暂停执行,将控制权交给事件循环
print(f"[{time.strftime('%H:%M:%S')}] {name} 等待结束!")
return f"{name} 完成" # 返回任务结果
except Exception as e:
print(f"[{time.strftime('%H:%M:%S')}] {name} 任务发生异常: {e}")
raise # 重新抛出异常,以便 gather 或 wait_for 捕获
async def main():
"""
主协程,创建并运行多个任务。
"""
print(f"[{time.strftime('%H:%M:%S')}] 主程序开始")
# 使用 asyncio.create_task() 创建多个任务,它们将并发运行
task1 = asyncio.create_task(say_hello("Alice", 3))
task2 = asyncio.create_task(say_hello("Bob", 1))
task3 = asyncio.create_task(say_hello("Charlie", 2))
# 使用 asyncio.gather() 等待所有任务完成,并收集它们的结果
# 使用 asyncio.gather() 等待所有任务完成,并收集它们的结果
# asyncio.gather 会并发运行所有任务,并等待它们全部完成。
# 返回值是所有任务的结果列表,顺序与传入任务的顺序一致。
results = await asyncio.gather(task1, task2, task3)
print(f"[{time.strftime('%H:%M:%S')}] 所有任务完成,结果: {results}")
# 演示任务取消
print(f"\n[{time.strftime('%H:%M:%S')}] 演示任务取消")
cancel_task_instance = asyncio.create_task(cancelable_task("David", 5))
await asyncio.sleep(1) # 等待一小段时间,让任务开始执行
cancel_task_instance.cancel() # 取消任务
try:
await cancel_task_instance
except asyncio.CancelledError:
print(f"[{time.strftime('%H:%M:%S')}] 捕获到 David 任务的 CancelledError")
# 演示任务超时
print(f"\n[{time.strftime('%H:%M:%S')}] 演示任务超时")
timeout_task_instance = asyncio.create_task(say_hello("Eve", 4))
try:
# 设置超时时间为 2 秒,如果任务在 2 秒内未完成,则会抛出 asyncio.TimeoutError
result = await asyncio.wait_for(timeout_task_instance, timeout=2)
print(f"[{time.strftime('%H:%M:%S')}] Eve 任务在超时前完成,结果: {result}")
except asyncio.TimeoutError:
print(f"[{time.strftime('%H:%M:%S')}] Eve 任务超时了!")
# 超时后任务仍然在运行,需要手动取消
timeout_task_instance.cancel()
try:
await timeout_task_instance
except asyncio.CancelledError:
print(f"[{time.strftime('%H:%M:%S')}] Eve 任务被取消。")
# 演示异常处理
print(f"\n[{time.strftime('%H:%M:%S')}] 演示异常处理")
async def raise_error_task():
"""一个会抛出异常的异步任务。"""
await asyncio.sleep(1)
raise ValueError("这是一个模拟的错误!")
error_task = asyncio.create_task(raise_error_task())
try:
await error_task
except ValueError as e:
print(f"[{time.strftime('%H:%M:%S')}] 捕获到异常: {e}")
# 使用 gather 捕获异常
async def task_with_error(name: str, delay: int, should_raise: bool = False):
"""一个可能抛出异常的异步任务。"""
try:
print(f"[{time.strftime('%H:%M:%S')}] {name} 开始等待 {delay} 秒...")
await asyncio.sleep(delay)
if should_raise:
raise RuntimeError(f"{name} 模拟运行时错误")
print(f"[{time.strftime('%H:%M:%S')}] {name} 等待结束!")
return f"{name} 完成"
except Exception as e:
print(f"[{time.strftime('%H:%M:%S')}] {name} 任务发生异常: {e}")
raise # 重新抛出异常
task_ok = asyncio.create_task(task_with_error("Frank", 1))
task_err = asyncio.create_task(task_with_error("Grace", 2, should_raise=True))
# 当 return_exceptions=True 时,gather 会收集所有结果和异常,而不是在第一个异常时停止。
gather_results_with_exceptions = await asyncio.gather(task_ok, task_err, return_exceptions=True)
print(f"[{time.strftime('%H:%M:%S')}] gather 结果 (包含异常): {gather_results_with_exceptions}")
# 当 return_exceptions=False (默认) 时,gather 会在第一个异常发生时立即停止并抛出异常。
print(f"\n[{time.strftime('%H:%M:%S')}] 演示 gather 遇到异常时停止 (默认行为)")
task_ok_2 = asyncio.create_task(task_with_error("Heidi", 1))
task_err_2 = asyncio.create_task(task_with_error("Ivan", 2, should_raise=True))
task_ok_3 = asyncio.create_task(task_with_error("Judy", 3))
try:
await asyncio.gather(task_ok_2, task_err_2, task_ok_3)
except RuntimeError as e:
print(f"[{time.strftime('%H:%M:%S')}] 捕获到 gather 抛出的异常: {e}")
# 此时,其他未完成的任务可能仍在运行,需要手动取消
for task in [task_ok_2, task_err_2, task_ok_3]:
if not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
print(f"[{time.strftime('%H:%M:%S')}] 未完成任务 {task.get_name()} 被取消。")
print(f"[{time.strftime('%H:%M:%S')}] 主程序结束")
if __name__ == "__main__":
# asyncio.run() 是运行 asyncio 应用程序的推荐入口点。
# 它负责启动事件循环,运行顶层协程,并在协程完成后关闭事件循环。
asyncio.run(main())
执行结果
[10:47:43] 主程序开始
[10:47:43] Alice 开始等待 3 秒...
[10:47:43] Bob 开始等待 1 秒...
[10:47:43] Charlie 开始等待 2 秒...
[10:47:44] Bob 等待结束!
[10:47:45] Charlie 等待结束!
[10:47:46] Alice 等待结束!
[10:47:46] 所有任务完成,结果: ['Alice 完成', 'Bob 完成', 'Charlie 完成']
[10:47:46] 演示任务取消
[10:47:46] David (可取消) 开始等待 5 秒...
[10:47:47] David (可取消) 被取消了!
[10:47:47] 演示任务超时
[10:47:47] Eve 开始等待 4 秒...
[10:47:49] Eve 任务超时了!
[10:47:49] Eve 任务被取消。
[10:47:49] 演示异常处理
[10:47:50] 捕获到异常: 这是一个模拟的错误!
[10:47:50] Frank 开始等待 1 秒...
[10:47:50] Grace 开始等待 2 秒...
[10:47:51] Frank 等待结束!
[10:47:52] Grace 任务发生异常: Grace 模拟运行时错误
[10:47:52] gather 结果 (包含异常): ['Frank 完成', RuntimeError('Grace 模拟运行时错误')]
[10:47:52] 演示 gather 遇到异常时停止 (默认行为)
[10:47:52] Heidi 开始等待 1 秒...
[10:47:52] Ivan 开始等待 2 秒...
[10:47:52] Judy 开始等待 3 秒...
[10:47:53] Heidi 等待结束!
[10:47:54] Ivan 任务发生异常: Ivan 模拟运行时错误
[10:47:54] 捕获到 gather 抛出的异常: Ivan 模拟运行时错误
[10:47:54] 未完成任务 Task-12 被取消。
[10:47:54] 主程序结束
进程已结束,退出代码为 0
引用链接
[1]
Python协程实现 | 李乾坤的博客: https://qiankunli.github.io/2024/07/19/python_asyncio.html[2]
协程与任务 — Python 3.13.9 文档: https://docs.python.org/zh-cn/3.13/library/asyncio-task.html[3]
Python's asyncio: A Hands-On Walkthrough – Real Python: https://realpython.com/async-io-python/
评论区