字数 9023,阅读大约需 46 分钟
python async 异步编程
Python异步编程(asyncio)完整详细指南
目录
1. 核心概念深入解析[1]
2. 关键语法详解[2]
3. 事件循环机制[3]
4. 协程生命周期[4]
5. 并发模式对比[5]
6. 常用编程模式[6]
7. 高级特性详解[7]
8. 异常处理机制[8]
9. 性能优化策略[9]
10. 实际应用场景[10]
11. 最佳实践[11]
12. 生态系统与第三方库[12]
核心概念深入解析
1.1 异步编程基础
什么是异步编程?
• 异步编程允许程序在等待某个操作完成时,继续执行其他任务
• 通过协作式多任务实现,而非抢占式多任务
• 单线程内实现并发,避免了线程间的复杂同步问题
并发 vs 并行
并发(Concurrency): 多个任务交替执行,给人同时进行的错觉
┌─────┐ ┌─────┐ ┌─────┐
│Task1│ │Task2│ │Task1│ 时间轴 →
└─────┘ └─────┘ └─────┘
并行(Parallelism): 多个任务真正同时执行
┌─────┐
│Task1│ CPU核心1
└─────┘
┌─────┐
│Task2│ CPU核心2
└─────┘
1.2 协程(Coroutines)深入理解
协程的本质
• 协程是可以暂停和恢复执行的函数
• 保存执行状态,包括局部变量、执行位置等
• 通过
yield
点主动让出控制权
协程状态转换
创建 → 运行 → 暂停 → 恢复 → 完成
↓ ↓ ↓ ↓ ↓
CREATED → RUNNING → SUSPENDED → RUNNING → FINISHED
协程对象属性
import asyncio
async def example():
return "Hello"
# 创建协程对象
coro = example()
print(f"协程对象: {coro}")
print(f"协程名称: {coro.__name__}")
print(f"协程状态: {coro.cr_running}")
# 必须运行才能获得结果
result = asyncio.run(coro)
关键语法详解
2.1 async/await 语法规则
async def 的使用规则
# ✅ 正确用法
async def valid_coroutine():
return "valid"
async def with_await():
result = await asyncio.sleep(1) # 可以使用await
return result
async def with_return():
return "可以返回值"
# ✅ 异步生成器
async def async_generator():
for i in range(3):
yield i
await asyncio.sleep(0.1)
# ❌ 错误用法
async def invalid():
yield from range(3) # SyntaxError: 不能使用yield from
def sync_function():
result = await something() # SyntaxError: 只能在async函数中使用await
await 的使用规则
import asyncio
async def awaitable_example():
# ✅ 可以await的对象
await asyncio.sleep(1) # 内置协程
await another_coroutine() # 其他协程
await asyncio.Future() # Future对象
await asyncio.Task() # Task对象
# ❌ 不能await的对象
# await time.sleep(1) # 普通函数
# await "string" # 普通对象
2.2 协程函数 vs 协程对象
async def my_coroutine():
return "Hello World"
# 协程函数 - 定义
print(type(my_coroutine)) # <class 'function'>
# 协程对象 - 调用协程函数返回的对象
coro_obj = my_coroutine()
print(type(coro_obj)) # <class 'coroutine'>
# 执行协程对象
result = asyncio.run(coro_obj)
print(result) # "Hello World"
事件循环机制
3.1 事件循环工作原理
事件循环的核心职责
1. 调度协程: 决定哪个协程应该运行
2. 处理I/O: 监控I/O操作完成状态
3. 执行回调: 处理定时器和回调函数
4. 管理任务: 创建、取消、监控任务状态
事件循环执行流程
1. 检查就绪的协程
2. 执行就绪协程直到遇到await
3. 处理I/O事件和回调
4. 检查定时器
5. 重复上述过程
3.2 事件循环操作详解
import asyncio
# 获取当前运行的事件循环
async def loop_info():
loop = asyncio.get_running_loop()
print(f"循环是否运行: {loop.is_running()}")
print(f"循环是否关闭: {loop.is_closed()}")
# 调度回调
loop.call_soon(callback_function)
# 调度延迟回调
loop.call_later(5, delayed_callback)
# 在指定时间调度
import time
loop.call_at(time.time() + 10, timed_callback)
def callback_function():
print("立即执行的回调")
def delayed_callback():
print("5秒后执行的回调")
def timed_callback():
print("定时执行的回调")
3.3 多种运行方式
import asyncio
async def main():
print("主协程执行")
# 方式1: 推荐的现代方式
asyncio.run(main())
# 方式2: 手动管理事件循环 (不推荐)
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
# 方式3: 获取现有循环
async def in_running_loop():
loop = asyncio.get_running_loop()
# 在已运行的循环中操作
协程生命周期
4.1 协程状态管理
import asyncio
import inspect
async def lifecycle_demo():
print("协程开始执行")
await asyncio.sleep(1)
print("协程即将结束")
return "完成"
async def monitor_coroutine():
# 创建协程对象
coro = lifecycle_demo()
print(f"协程状态: {inspect.getcoroutinestate(coro)}")
# 输出: CORO_CREATED
# 创建任务
task = asyncio.create_task(coro)
print(f"任务已创建: {task}")
print(f"任务是否完成: {task.done()}")
# 等待完成
result = await task
print(f"任务结果: {result}")
print(f"任务是否完成: {task.done()}")
asyncio.run(monitor_coroutine())
4.2 任务(Task)详解
import asyncio
async def task_operations():
# 创建任务
task1 = asyncio.create_task(slow_operation(1))
task2 = asyncio.create_task(slow_operation(2))
# 任务属性
print(f"任务名称: {task1.get_name()}")
print(f"任务是否取消: {task1.cancelled()}")
print(f"任务是否完成: {task1.done()}")
# 获取任务结果(会等待完成)
try:
result1 = await task1
result2 = await task2
print(f"结果: {result1}, {result2}")
except asyncio.CancelledError:
print("任务被取消")
async def slow_operation(id):
await asyncio.sleep(2)
return f"操作{id}完成"
# 任务取消示例
async def cancellation_demo():
task = asyncio.create_task(slow_operation(1))
# 等待1秒后取消
await asyncio.sleep(1)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("任务被成功取消")
并发模式对比
5.1 四种并发模式详细对比
5.2 性能对比实例
import asyncio
import threading
import multiprocessing
import time
import requests
# 同步版本
def sync_fetch(url):
response = requests.get(url)
return len(response.content)
def sync_main(urls):
start = time.time()
results = [sync_fetch(url) for url in urls]
end = time.time()
print(f"同步版本耗时: {end - start:.2f}秒")
return results
# 多线程版本
def thread_main(urls):
start = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(sync_fetch, urls))
end = time.time()
print(f"多线程版本耗时: {end - start:.2f}秒")
return results
# 异步版本
import aiohttp
async def async_fetch(session, url):
async with session.get(url) as response:
content = await response.read()
return len(content)
async def async_main(urls):
start = time.time()
async with aiohttp.ClientSession() as session:
tasks = [async_fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
end = time.time()
print(f"异步版本耗时: {end - start:.2f}秒")
return results
# 测试URLs
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1"
]
# 预期结果:
# 同步版本: ~5秒 (5个请求 × 1秒延迟)
# 多线程版本: ~1秒 (并行执行)
# 异步版本: ~1秒 (并发执行,开销更小)
常用编程模式
6.1 并发执行模式
模式1: 等待所有任务完成
import asyncio
async def gather_pattern():
# 等待所有任务完成,获取所有结果
results = await asyncio.gather(
fetch_data(1),
fetch_data(2),
fetch_data(3),
return_exceptions=True # 返回异常而不是抛出
)
for i, result in enumerate(results, 1):
if isinstance(result, Exception):
print(f"任务{i}失败: {result}")
else:
print(f"任务{i}成功: {result}")
async def fetch_data(id):
await asyncio.sleep(id)
if id == 2:
raise ValueError(f"任务{id}出错")
return f"数据{id}"
模式2: 按完成顺序处理
async def as_completed_pattern():
tasks = [
asyncio.create_task(fetch_data(3)),
asyncio.create_task(fetch_data(1)),
asyncio.create_task(fetch_data(2))
]
# 按完成顺序处理结果
for completed_task in asyncio.as_completed(tasks):
try:
result = await completed_task
print(f"完成: {result}")
except Exception as e:
print(f"错误: {e}")
模式3: 等待第一个完成
async def wait_pattern():
tasks = [
asyncio.create_task(fetch_data(3)),
asyncio.create_task(fetch_data(1)),
asyncio.create_task(fetch_data(2))
]
# 等待第一个完成
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
# 处理已完成的任务
for task in done:
result = await task
print(f"第一个完成: {result}")
# 取消剩余任务
for task in pending:
task.cancel()
6.2 生产者-消费者模式详解
import asyncio
import random
class AsyncProducerConsumer:
def __init__(self, queue_size=10):
self.queue = asyncio.Queue(maxsize=queue_size)
self.running = True
async def producer(self, name, item_count):
"""生产者协程"""
for i in range(item_count):
# 模拟生产时间
await asyncio.sleep(random.uniform(0.1, 0.5))
item = f"{name}-item-{i}"
await self.queue.put(item)
print(f"🏭 {name} 生产了: {item} (队列大小: {self.queue.qsize()})")
print(f"✅ 生产者 {name} 完成")
async def consumer(self, name):
"""消费者协程"""
while self.running or not self.queue.empty():
try:
# 设置超时避免无限等待
item = await asyncio.wait_for(
self.queue.get(),
timeout=1.0
)
# 模拟处理时间
await asyncio.sleep(random.uniform(0.2, 0.8))
print(f"🔧 {name} 处理了: {item} (队列剩余: {self.queue.qsize()})")
# 标记任务完成
self.queue.task_done()
except asyncio.TimeoutError:
if not self.running:
break
continue
print(f"✅ 消费者 {name} 完成")
async def run_simulation(self):
# 创建生产者和消费者任务
tasks = [
# 2个生产者
asyncio.create_task(self.producer("Producer-1", 5)),
asyncio.create_task(self.producer("Producer-2", 3)),
# 3个消费者
asyncio.create_task(self.consumer("Consumer-A")),
asyncio.create_task(self.consumer("Consumer-B")),
asyncio.create_task(self.consumer("Consumer-C"))
]
# 等待所有生产者完成
await asyncio.gather(*tasks[:2])
print("📢 所有生产者已完成,等待消费者处理完队列...")
# 等待队列清空
await self.queue.join()
# 停止消费者
self.running = False
# 等待所有消费者完成
await asyncio.gather(*tasks[2:])
# 运行示例
async def main():
simulator = AsyncProducerConsumer(queue_size=5)
await simulator.run_simulation()
asyncio.run(main())
6.3 协程链式调用模式
import asyncio
class DataPipeline:
"""数据处理管道示例"""
async def fetch_raw_data(self, source_id):
"""第一步: 获取原始数据"""
print(f"📥 获取数据源 {source_id}")
await asyncio.sleep(0.5) # 模拟网络请求
return {"source_id": source_id, "raw_data": f"raw_{source_id}"}
async def validate_data(self, data):
"""第二步: 数据验证"""
print(f"✅ 验证数据 {data['source_id']}")
await asyncio.sleep(0.2) # 模拟验证过程
if data["source_id"] % 3 == 0: # 模拟验证失败
raise ValueError(f"数据源 {data['source_id']} 验证失败")
data["validated"] = True
return data
async def transform_data(self, data):
"""第三步: 数据转换"""
print(f"🔄 转换数据 {data['source_id']}")
await asyncio.sleep(0.3) # 模拟转换过程
data["transformed_data"] = data["raw_data"].upper()
return data
async def save_data(self, data):
"""第四步: 保存数据"""
print(f"💾 保存数据 {data['source_id']}")
await asyncio.sleep(0.1) # 模拟保存过程
data["saved"] = True
return data
async def process_single_source(self, source_id):
"""处理单个数据源的完整流程"""
try:
# 链式调用
data = await self.fetch_raw_data(source_id)
data = await self.validate_data(data)
data = await self.transform_data(data)
data = await self.save_data(data)
print(f"✨ 数据源 {source_id} 处理完成")
return data
except Exception as e:
print(f"❌ 数据源 {source_id} 处理失败: {e}")
return None
async def process_multiple_sources(self, source_ids):
"""并发处理多个数据源"""
print(f"🚀 开始处理 {len(source_ids)} 个数据源")
# 并发执行所有数据源处理
results = await asyncio.gather(
*[self.process_single_source(sid) for sid in source_ids],
return_exceptions=True
)
# 统计结果
successful = [r for r in results if r is not None and not isinstance(r, Exception)]
failed = len(results) - len(successful)
print(f"📊 处理完成: {len(successful)} 成功, {failed} 失败")
return successful
# 使用示例
async def main():
pipeline = DataPipeline()
# 处理数据源 1-6
results = await pipeline.process_multiple_sources(range(1, 7))
print(f"\n📈 最终结果: 成功处理 {len(results)} 个数据源")
asyncio.run(main())
高级特性详解
7.1 异步迭代器与异步生成器
import asyncio
# 异步迭代器类
class AsyncRange:
def __init__(self, start, stop):
self.start = start
self.stop = stop
def __aiter__(self):
return self
async def __anext__(self):
if self.start >= self.stransform: translateY(
raise StopAsyncIteration
current = self.start
self.start += 1
# 模拟异步操作
await asyncio.sleep(0.1)
return current
# 异步生成器
async def async_fibonacci(n):
"""异步斐波那契数列生成器"""
a, b = 0, 1
count = 0
while count < n:
yield a
a, b = b, a + b
count += 1
# 每次生成后暂停,允许其他协程运行
await asyncio.sleep(0.05)
# 异步数据流处理器
class AsyncDataStream:
def __init__(self, data_source):
self.data_source = data_source
async def __aiter__(self):
for item) in self.data_source:
# 模拟异步数据处理
await asyncio.sleep(0.1)
processed = await self.process_item(item)
yield processed
async def process_item(self, item):
"""异步处理单个数据项"""
await asyncio.sleep(0.05) # 模拟处理时间
return f"processed_{item}"
# 使用示例
async def async_iteration_demo():
print("=== 异步迭代器示例 ===")
async for num in AsyncRange(1, 5):
print(f"异步迭代器产生: {num}")
print("\n=== 异步生成器示例 ===")
async for fib in async_fibonacci(8):
print(f"斐波那契数: {fib}")
print("\n=== 异步列表推导式 ===")
squares = [x**2 async for x in AsyncRange(1, 6)]
print(f"异步生成的平方数: {squares}")
print("\n=== 异步数据流处理 ===")
stream = AsyncDataStream(['a', 'b', 'c', 'd'])
async for processed in stream:
print(f"流处理结果: {processed}")
asyncio.run(async_iteration_demo())
7.2 异步上下文管理器
import asyncio
import aiofiles
import aiohttp
# 自定义异步上下文管理器
class AsyncDatabaseConnection:
def __init__(self, db_url):
self.db_url = db_url
self.connection = None
async def __aenter__(self):
print(f"🔌 连接到数据库: {self.db_url}")
await asyncio.sleep(0.1) # 模拟连接时间
self.connection = f"connection_to_{self.db_url}"
return self.connection
async def __aexit__(self, exc_type, exc_val, exc_tb):
print(f"📴 关闭数据库连接")
await asyncio.sleep(0.05) # 模拟关闭时间
self.connection = None
if exc_type:
print(f"⚠️ 异常发生: {exc_type.__name__}: {exc_val}")
# 返回False表示不抑制异常
return False
# 异步资源管理器
class AsyncResourceManager:
def __init__(self, resource_name):
self.resource_name = resource_name
self.acquired = False
async def __aenter__(self):
print(f"🔓 获取资源: {self.resource_name}")
await asyncio.sleep(0.1)
self.acquired = True
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print(f"🔒 释放资源: {self.resource_name}")
await asyncio.sleep(0.05)
self.acquired = False
async def do_work(self):
if not self.acquired:
raise RuntimeError("资源未获取")
print(f"⚙️ 使用资源 {self.resource_name} 进行工作")
await asyncio.sleep(0.2)
return f"work_result_from_{self.resource_name}"
# 使用示例
async def context_manager_demo():
print("=== 异步数据库连接示例 ===")
async with AsyncDatabaseConnection("postgresql://localhost") as conn:
print(f"🗃️ 使用连接: {conn}")
await asyncio.sleep(0.1)
print("\n=== 异步资源管理示例 ===")
async with AsyncResourceManager("critical_resource") as resource:
result = await resource.do_work()
print(f"📊 工作结果: {result}")
print("\n=== 异常处理示例 ===")
try:
async with AsyncDatabaseConnection("invalid://db") as conn:
print("这行不会执行")
raise ValueError("模拟业务异常")
except ValueError as e:
print(f"🚨 捕获异常: {e}")
print("\n=== 文件I/O示例 ===")
# 使用第三方库 aiofiles
try:
async with aiofiles.open('temp_file.txt', 'w') as f:
await f.write("异步文件写入测试\n")
await f.write("第二行内容\n")
async with aiofiles.open('temp_file.txt', 'r') as f:
content = await f.read()
print(f"📄 文件内容:\n{content}")
except Exception as e:
print(f"文件操作跳过: {e}")
asyncio.run(context_manager_demo())
7.3 异步信号量与锁
import asyncio
import random
class AsyncConcurrencyControl:
def __init__(self):
# 信号量: 限制同时执行的协程数量
self.semaphore = asyncio.Semaphore(3)
# 互斥锁: 保护共享资源
self.lock = asyncio.Lock()
# 事件: 协程间通信
self.event = asyncio.Event()
# 条件变量: 复杂的同步控制
self.condition = asyncio.Condition()
# 共享资源
self.shared_counter = 0
self.shared_list = []
async def semaphore_demo(self, worker_id):
"""信号量限制并发数"""
async with self.semaphore:
print(f"🔧 工作者 {worker_id} 开始工作")
# 模拟工作时间
work_time = random.uniform(1, 3)
await asyncio.sleep(work_time)
print(f"✅ 工作者 {worker_id} 完成工作 (耗时 {work_time:.1f}s)")
async def lock_demo(self, worker_id):
"""互斥锁保护共享资源"""
async with self.lock:
print(f"🔒 工作者 {worker_id} 获得锁")
# 临界区: 修改共享资源
old_value = self.shared_counter
await asyncio.sleep(0.1) # 模拟处理时间
self.shared_counter = old_value + 1
print(f"📊 工作者 {worker_id} 更新计数器: {old_value} → {self.shared_counter}")
async def event_waiter(self, waiter_id):
"""等待事件发生"""
print(f"⏳ 等待者 {waiter_id} 开始等待事件")
await self.event.wait()
print(f"🎉 等待者 {waiter_id} 收到事件通知")
async def event_setter(self):
"""设置事件"""
await asyncio.sleep(2) # 等待2秒后触发事件
print(f"📡 事件被设置")
self.event.set()
async def condition_consumer(self, consumer_id):
"""条件变量消费者"""
async with self.condition:
await self.condition.wait_for(lambda: len(self.shared_list) > 0)
item = self.shared_list.pop(0)
print(f"🔽 消费者 {consumer_id} 消费了: {item}")
async def condition_producer(self, producer_id):
"""条件变量生产者"""
for i in range(3):
await asyncio.sleep(1)
async with self.condition:
item = f"item_{producer_id}_{i}"
self.shared_list.append(item)
print(f"🔼 生产者 {producer_id} 生产了: {item}")
self.condition.notify_all() # 通知所有等待者
# 并发控制示例
async def concurrency_control_demo():
controller = AsyncConcurrencyControl()
print("=== 信号量限制并发数示例 ===")
# 创建5个工作者,但信号量只允许3个同时执行
semaphore_tasks = [
asyncio.create_task(controller.semaphore_demo(i))
for i in range(1, 6)
]
await asyncio.gather(*semaphore_tasks)
print(f"\n=== 互斥锁保护共享资源示例 ===")
# 多个工作者同时修改共享计数器
lock_tasks = [
asyncio.create_task(controller.lock_demo(i))
for i in range(1, 6)
]
await asyncio.gather(*lock_tasks)
print(f"🏁 最终计数器值: {controller.shared_counter}")
print(f"\n=== 事件通知示例 ===")
# 创建等待者和事件设置者
event_tasks = [
asyncio.create_task(controller.event_waiter(i))
for i in range(1, 4)
]
event_tasks.append(asyncio.create_task(controller.event_setter()))
await asyncio.gather(*event_tasks)
print(f"\n=== 条件变量示例 ===")
# 生产者-消费者模式使用条件变量
condition_tasks = [
asyncio.create_task(controller.condition_producer(1)),
asyncio.create_task(controller.condition_consumer(1)),
asyncio.create_task(controller.condition_consumer(2)),
]
await asyncio.gather(*condition_tasks)
asyncio.run(concurrency_control_demo())
异常处理机制
8.1 异步异常处理基础
import asyncio
class AsyncExceptionHandling:
async def basic_exception_demo(self):
"""基础异常处理"""
try:
await self.failing_operation()
except ValueError as e:
print(f"❌ 捕获ValueError: {e}")
except asyncio.TimeoutError as e:
print(f"⏰ 操作超时: {e}")
except Exception as e:
print(f"🚨 未知异常: {type(e).__name__}: {e}")
else:
print("✅ 操作成功完成")
finally:
print("🧹 清理操作")
async def failing_operation(self):
"""模拟可能失败的异步操作"""
import random
await asyncio.sleep(0.1)
error_type = random.choice(['value', 'timeout', 'success'])
if error_type == 'value':
raise ValueError("数据验证失败")
elif error_type == 'timeout':
raise asyncio.TimeoutError("操作超时")
else:
return "操作成功"
async def timeout_handling_demo(self):
"""超时处理示例"""
try:
# 设置2秒超时
result = await asyncio.wait_for(
self.slow_operation(3), # 需要3秒的操作
timeout=2.0
)
print(f"✅ 操作完成: {result}")
except asyncio.TimeoutError:
print("⏰ 操作超时,已取消")
async def slow_operation(self, duration):
"""模拟慢操作"""
print(f"🐌 开始慢操作 (需要{duration}秒)")
await asyncio.sleep(duration)
return f"完成{duration}秒操作"
async def gather_exception_demo(self):
"""gather中的异常处理"""
print("\n--- gather异常处理 (return_exceptions=False) ---")
try:
results = await asyncio.gather(
self.operation_success("A"),
self.operation_failure("B"), # 这个会失败
self.operation_success("C"),
# return_exceptions=False # 默认值,异常会被抛出
)
except ValueError as e:
print(f"❌ gather被异常中断: {e}")
print("\n--- gather异常处理 (return_exceptions=True) ---")
results = await asyncio.gather(
self.operation_success("A"),
self.operation_failure("B"), # 这个会失败
self.operation_success("C"),
return_exceptions=True # 异常作为结果返回
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"🔴 任务{i}异常: {result}")
else:
print(f"🟢 任务{i}成功: {result}")
async def operation_success(self, name):
await asyncio.sleep(0.1)
return f"操作{name}成功"
async def operation_failure(self, name):
await asyncio.sleep(0.1)
raise ValueError(f"操作{name}失败")
# Python 3.11+ 异常组处理
class AsyncExceptionGroups:
async def exception_group_demo(self):
"""异常组处理示例 (Python 3.11+)"""
try:
results = await asyncio.gather(
self.coro_error_a(),
self.coro_error_b(),
self.coro_error_c(),
return_exceptions=True
)
# 收集所有异常
exceptions = [r for r in results if isinstance(r, Exception)]
if exceptions:
raise ExceptionGroup("批量操作失败", exceptions)
except* ValueError as eg:
print(f"🔴 处理ValueError组: {len(eg.exceptions)} 个")
for exc in eg.exceptions:
print(f" - {exc}")
except* TypeError as eg:
print(f"🟠 处理TypeError组: {len(eg.exceptions)} 个")
for exc in eg.exceptions:
print(f" - {exc}")
except* KeyError as eg:
print(f"🟡 处理KeyError组: {len(eg.exceptions)} 个")
for exc in eg.exceptions:
print(f" - {exc}")
async def coro_error_a(self):
await asyncio.sleep(0.1)
raise ValueError("A操作失败")
async def coro_error_b(self):
await asyncio.sleep(0.1)
raise TypeError("B操作类型错误")
async def coro_error_c(self):
await asyncio.sleep(0.1)
raise KeyError("C操作键错误")
# 异常处理最佳实践
class AsyncExceptionBestPractices:
async def robust_operation_demo(self):
"""健壮的异步操作示例"""
max_retries = 3
retry_delay = 1
for attempt in range(max_retries):
try:
result = await asyncio.wait_for(
self.unreliable_operation(),
timeout=5.0
)
print(f"✅ 操作成功: {result}")
return result
except asyncio.TimeoutError:
print(f"⏰ 第{attempt + 1}次尝试超时")
if attempt < max_retries - 1:
await asyncio.sleep(retry_delay)
retry_delay *= 2 # 指数退避
else:
print("❌ 所有重试都超时,操作失败")
raise
except Exception as e:
print(f"🚨 第{attempt + 1}次尝试出错: {e}")
if attempt < max_retries - 1:
await asyncio.sleep(retry_delay)
else:
print("❌ 所有重试都失败")
raise
async def unreliable_operation(self):
"""模拟不稳定的操作"""
import random
await asyncio.sleep(random.uniform(0.1, 6)) # 随机延迟
if random.random() < 0.7: # 70%概率失败
raise ConnectionError("网络连接不稳定")
return "操作成功完成"
# 运行异常处理示例
async def exception_handling_demo():
handler = AsyncExceptionHandling()
print("=== 基础异常处理 ===")
await handler.basic_exception_demo()
print(f"\n=== 超时处理 ===")
await handler.timeout_handling_demo()
print(f"\n=== gather异常处理 ===")
await handler.gather_exception_demo()
# Python 3.11+ 特性
if hasattr(builtins, 'ExceptionGroup'):
print(f"\n=== 异常组处理 (Python 3.11+) ===")
group_handler = AsyncExceptionGroups()
await group_handler.exception_group_demo()
print(f"\n=== 健壮操作示例 ===")
best_practices = AsyncExceptionBestPractices()
try:
await best_practices.robust_operation_demo()
except Exception as e:
print(f"最终失败: {e}")
asyncio.run(exception_handling_demo())
性能优化策略
9.1 性能监控与分析
import asyncio
import time
import functools
import cProfile
import pstats
from contextlib import asynccontextmanager
class AsyncPerformanceAnalyzer:
def __init__(self):
self.metrics = {}
def timer(self, name=None):
"""异步函数执行时间装饰器"""
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
func_name = name or f"{func.__module__}.{func.__name__}"
start_time = time.perf_counter()
try:
result = await func(*args, **kwargs)
return result
finally:
end_time = time.perf_counter()
execution_time = end_time - start_time
if func_name not in self.metrics:
self.metrics[func_name] = []
self.metrics[func_name].append(execution_time)
print(f"⏱️ {func_name} 执行时间: {execution_time:.4f}秒")
return wrapper
return decorator
@asynccontextmanager
async def profile_context(self, name="async_profile"):
"""异步代码性能分析上下文管理器"""
profiler = cProfile.Profile()
profiler.enable()
start_time = time.perf_counter()
try:
yield
finally:
end_time = time.perf_counter()
profiler.disable()
print(f"\n📊 性能分析报告: {name}")
print(f"总执行时间: {end_time - start_time:.4f}秒")
# 创建统计对象
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative')
stats.print_stats(10) # 显示前10个最耗时的函数
def print_summary(self):
"""打印性能摘要"""
print("\n📈 性能摘要:")
for func_name, times in self.metrics.items():
avg_time = sum(times) / len(times)
min_time = min(times)
max_time = max(times)
total_time = sum(times)
print(f" {func_name}:")
print(f" 调用次数: {len(times)}")
print(f" 平均时间: {avg_time:.4f}秒")
print(f" 最短时间: {min_time:.4f}秒")
print(f" 最长时间: {max_time:.4f}秒")
print(f" 总计时间: {total_time:.4f}秒")
# 性能优化示例
class AsyncPerformanceOptimization:
def __init__(self):
self.analyzer = AsyncPerformanceAnalyzer()
@AsyncPerformanceAnalyzer().timer("slow_sequential")
async def slow_sequential_approach(self, urls):
"""低效的顺序方法"""
results = []
for url in urls:
result = await self.simulate_network_request(url)
results.append(result)
return results
@AsyncPerformanceAnalyzer().timer("fast_concurrent")
async def fast_concurrent_approach(self, urls):
"""高效的并发方法"""
tasks = [self.simulate_network_request(url) for url in urls]
results = await asyncio.gather(*tasks)
return results
@AsyncPerformanceAnalyzer().timer("optimized_concurrent")
async def optimized_concurrent_approach(self, urls, max_concurrent=10):
"""优化的并发方法 - 限制并发数"""
semaphore = asyncio.Semaphore(max_concurrent)
async def limited_request(url):
async with semaphore:
return await self.simulate_network_request(url)
tasks = [limited_request(url) for url in urls]
results = await asyncio.gather(*tasks)
return results
async def simulate_network_request(self, url):
"""模拟网络请求"""
await asyncio.sleep(0.1) # 模拟网络延迟
return f"Response from {url}"
async def cpu_bound_task(self, n):
"""CPU密集型任务(不适合asyncio)"""
total = 0
for i in range(n):
total += i * i
if i % 100000 == 0:
await asyncio.sleep(0) # 让出控制权
return total
async def memory_efficient_processing(self, large_dataset):
"""内存高效的数据处理"""
# 使用异步生成器处理大数据集
async def process_chunk(chunk):
await asyncio.sleep(0.01) # 模拟处理时间
return [item * 2 for item in chunk]
chunk_size = 1000
results = []
for i in range(0, len(large_dataset), chunk_size):
chunk = large_dataset[i:i + chunk_size]
processed_chunk = await process_chunk(chunk)
results.extend(processed_chunk)
# 定期让出控制权
if i % (chunk_size * 10) == 0:
await asyncio.sleep(0)
return results
# 缓存优化
class AsyncCacheOptimization:
def __init__(self):
self.cache = {}
self.cache_stats = {"hits": 0, "misses": 0}
def async_lru_cache(self, maxsize=128):
"""异步LRU缓存装饰器"""
def decorator(func):
cache = {}
cache_order = []
@functools.wraps(func)
async def wrapper(*args, **kwargs):
key = str(args) + str(sorted(kwargs.items()))
if key in cache:
# 缓存命中,更新访问顺序
cache_order.remove(key)
cache_order.append(key)
self.cache_stats["hits"] += 1
print(f"💾 缓存命中: {func.__name__}")
return cache[key]
# 缓存未命中
self.cache_stats["misses"] += 1
result = await func(*args, **kwargs)
# 添加到缓存
cache[key] = result
cache_order.append(key)
# 如果超过最大大小,删除最久未使用的
if len(cache) > maxsize:
oldest_key = cache_order.pop(0)
del cache[oldest_key]
print(f"💿 缓存存储: {func.__name__}")
return result
return wrapper
return decorator
@async_lru_cache(maxsize=32)
async def expensive_computation(self, x, y):
"""昂贵的计算操作"""
print(f"🔥 执行昂贵计算: f({x}, {y})")
await asyncio.sleep(1) # 模拟耗时计算
return x ** y + y ** x
def print_cache_stats(self):
total = self.cache_stats["hits"] + self.cache_stats["misses"]
if total > 0:
hit_rate = self.cache_stats["hits"] / total * 100
print(f"📊 缓存统计: 命中率 {hit_rate:.1f}% ({self.cache_stats['hits']}/{total})")
# 连接池优化
class AsyncConnectionPoolOptimization:
def __init__(self, pool_size=10):
self.pool_size = pool_size
self.semaphore = asyncio.Semaphore(pool_size)
self.active_connections = 0
self.total_requests = 0
@asynccontextmanager
async def get_connection(self):
"""模拟连接池中的连接获取"""
async with self.semaphore:
self.active_connections += 1
self.total_requests += 1
print(f"🔗 获取连接 (活跃: {self.active_connections}/{self.pool_size})")
try:
# 模拟连接建立时间
await asyncio.sleep(0.01)
yield f"connection_{self.total_requests}"
finally:
self.active_connections -= 1
print(f"📤 释放连接 (活跃: {self.active_connections}/{self.pool_size})")
async def make_request_with_pool(self, request_id):
"""使用连接池发送请求"""
async with self.get_connection() as conn:
# 模拟请求处理
await asyncio.sleep(0.5)
return f"Response for request {request_id} via {conn}"
# 性能测试运行
async def performance_optimization_demo():
print("=== 性能优化示例 ===")
optimizer = AsyncPerformanceOptimization()
# 测试URL列表
test_urls = [f"http://api{i}.example.com" for i in range(20)]
# 对比不同方法的性能
print("🐌 顺序方法:")
await optimizer.slow_sequential_approach(test_urls[:5]) # 只测试5个URL
print(f"\n🚀 并发方法:")
await optimizer.fast_concurrent_approach(test_urls)
print(f"\n⚡ 优化并发方法:")
await optimizer.optimized_concurrent_approach(test_urls, max_concurrent=5)
# 缓存优化测试
print(f"\n=== 缓存优化测试 ===")
cache_optimizer = AsyncCacheOptimization()
# 第一次调用 - 缓存未命中
await cache_optimizer.expensive_computation(2, 3)
await cache_optimizer.expensive_computation(3, 4)
# 第二次调用 - 缓存命中
await cache_optimizer.expensive_computation(2, 3)
await cache_optimizer.expensive_computation(3, 4)
cache_optimizer.print_cache_stats()
# 连接池优化测试
print(f"\n=== 连接池优化测试 ===")
pool_optimizer = AsyncConnectionPoolOptimization(pool_size=3)
# 并发请求超过连接池大小
requests = [
asyncio.create_task(pool_optimizer.make_request_with_pool(i))
for i in range(8)
]
results = await asyncio.gather(*requests)
print(f"✅ 完成 {len(results)} 个请求")
asyncio.run(performance_optimization_demo())
实际应用场景
10.1 Web爬虫应用
import asyncio
import aiohttp
import time
from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup
class AsyncWebCrawler:
def __init__(self, max_concurrent=10, delay=1):
self.max_concurrent = max_concurrent
self.delay = delay # 请求间隔,避免过于频繁
self.semaphore = asyncio.Semaphore(max_concurrent)
self.visited_urls = set()
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=10),
headers={'User-Agent': 'AsyncWebCrawler/1.0'}
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def fetch_url(self, url):
"""获取单个URL的内容"""
if url in self.visited_urls:
return None
self.visited_urls.add(url)
async with self.semaphore:
try:
print(f"🔍 爬取: {url}")
async with self.session.get(url) as response:
if response.status == 200:
content = await response.text()
await asyncio.sleep(self.delay) # 礼貌延迟
return {
'url': url,
'status': response.status,
'content': content,
'headers': dict(response.headers)
}
else:
print(f"❌ HTTP {response.status}: {url}")
return None
except Exception as e:
print(f"🚨 错误 {url}: {e}")
return None
async def extract_links(self, html_content, base_url):
"""从HTML中提取链接"""
try:
soup = BeautifulSoup(html_content, 'html.parser')
links = []
for link in soup.find_all('a', href=True):
href = link['href']
absolute_url = urljoin(base_url, href)
# 只处理HTTP/HTTPS链接
if absolute_url.startswith(('http://', 'https://')):
links.append(absolute_url)
return links
except Exception as e:
print(f"🔗 链接提取错误: {e}")
return []
async def crawl_website(self, start_urls, max_pages=50):
"""爬取整个网站"""
crawl_queue = list(start_urls)
crawled_data = []
pages_crawled = 0
while crawl_queue and pages_crawled < max_pages:
# 获取下一批要爬取的URL
batch_size = min(self.max_concurrent, len(crawl_queue))
current_batch = crawl_queue[:batch_size]
crawl_queue = crawl_queue[batch_size:]
# 并发爬取当前批次
tasks = [self.fetch_url(url) for url in current_batch]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理结果
for result in results:
if result and not isinstance(result, Exception):
crawled_data.append(result)
pages_crawled += 1
# 提取新链接加入队列
new_links = await self.extract_links(
result['content'],
result['url']
)
# 过滤已访问的链接
for link in new_links:
if (link not in self.visited_urls and
link not in crawl_queue and
self.is_same_domain(link, start_urls[0])):
crawl_queue.append(link)
return crawled_data
def is_same_domain(self, url1, url2):
"""检查两个URL是否在同一域名下"""
return urlparse(url1).netloc == urlparse(url2).netloc
# Web服务器应用
class AsyncWebServer:
def __init__(self, host='localhost', port=8080):
self.host = host
self.port = port
self.routes = {}
self.middleware = []
def route(self, path, methods=['GET']):
"""路由装饰器"""
def decorator(handler):
self.routes[path] = {
'handler': handler,
'methods': methods
}
return handler
return decorator
def middleware_decorator(self, middleware_func):
"""中间件装饰器"""
self.middleware.append(middleware_func)
return middleware_func
async def handle_request(self, reader, writer):
"""处理HTTP请求"""
try:
# 读取请求行
request_line = await reader.readline()
request_line = request_line.decode().strip()
if not request_line:
return
method, path, _ = request_line.split(' ', 2)
# 读取headers
headers = {}
while True:
line = await reader.readline()
if line == b'\r\n':
break
if line:
key, value = line.decode().strip().split(':', 1)
headers[key.strip()] = value.strip()
# 构造请求对象
request = {
'method': method,
'path': path,
'headers': headers,
'reader': reader
}
# 应用中间件
for middleware in self.middleware:
request = await middleware(request) or request
# 路由处理
if path in self.routes:
route_info = self.routes[path]
if method in route_info['methods']:
response = await route_info['handler'](request)
else:
response = "HTTP/1.1 405 Method Not Allowed\r\n\r\n"
else:
response = "HTTP/1.1 404 Not Found\r\n\r\n404 - Page Not Found"
# 发送响应
if isinstance(response, str):
response = f"HTTP/1.1 200 OK\r\nContent-Length: {len(response)}\r\n\r\n{response}"
writer.write(response.encode())
await writer.drain()
except Exception as e:
error_response = f"HTTP/1.1 500 Internal Server Error\r\n\r\nError: {str(e)}"
writer.write(error_response.encode())
await writer.drain()
finally:
writer.close()
await writer.wait_closed()
async def start_server(self):
"""启动服务器"""
server = await asyncio.start_server(
self.handle_request,
self.host,
self.port
)
print(f"🌐 服务器启动: http://{self.host}:{self.port}")
async with server:
await server.serve_forever()
# 数据库连接池应用
class AsyncDatabasePool:
def __init__(self, connection_string, pool_size=10):
self.connection_string = connection_string
self.pool_size = pool_size
self.pool = asyncio.Queue(maxsize=pool_size)
self.created_connections = 0
self.active_connections = 0
async def create_connection(self):
"""创建新的数据库连接"""
self.created_connections += 1
await asyncio.sleep(0.1) # 模拟连接建立时间
return f"db_connection_{self.created_connections}"
async def get_connection(self):
"""从连接池获取连接"""
try:
# 尝试从池中获取现有连接
connection = self.pool.get_nowait()
except asyncio.QueueEmpty:
# 如果池为空且未达到最大连接数,创建新连接
if self.created_connections < self.pool_size:
connection = await self.create_connection()
else:
# 等待池中有可用连接
connection = await self.pool.get()
self.active_connections += 1
return connection
async def return_connection(self, connection):
"""将连接返回池中"""
self.active_connections -= 1
await self.pool.put(connection)
async def execute_query(self, query):
"""执行数据库查询"""
connection = await self.get_connection()
try:
print(f"🗃️ 执行查询: {query} (连接: {connection})")
await asyncio.sleep(0.2) # 模拟查询执行时间
return f"查询结果: {query}"
finally:
await self.return_connection(connection)
def get_pool_stats(self):
"""获取连接池统计信息"""
return {
"总连接数": self.created_connections,
"活跃连接数": self.active_connections,
"池中可用连接数": self.pool.qsize()
}
# 实时数据处理应用
class AsyncRealTimeProcessor:
def __init__(self):
self.data_queue = asyncio.Queue()
self.processed_count = 0
self.error_count = 0
self.running = False
async def data_producer(self, data_source_name, count):
"""数据生产者 - 模拟实时数据流"""
for i in range(count):
data = {
'id': f"{data_source_name}_{i}",
'timestamp': time.time(),
'value': i * 10 + (i % 7), # 模拟数据
'source': data_source_name
}
await self.data_queue.put(data)
print(f"📨 生产数据: {data['id']}")
await asyncio.sleep(0.1) # 模拟数据产生间隔
print(f"✅ 数据源 {data_source_name} 完成")
async def data_processor(self, processor_id):
"""数据处理器"""
while self.running or not self.data_queue.empty():
try:
# 等待数据,设置超时避免无限等待
data = await asyncio.wait_for(
self.data_queue.get(),
timeout=1.0
)
# 处理数据
processed_data = await self.process_single_item(data)
if processed_data:
self.processed_count += 1
print(f"✨ 处理器{processor_id}完成: {data['id']} -> {processed_data['result']}")
except asyncio.TimeoutError:
if not self.running:
break
except Exception as e:
self.error_count += 1
print(f"❌ 处理器{processor_id}错误: {e}")
async def process_single_item(self, data):
"""处理单个数据项"""
try:
# 模拟数据处理逻辑
await asyncio.sleep(0.2) # 模拟处理时间
# 简单的数据转换
result = data['value'] * 2 + 1
return {
'original': data,
'result': result,
'processed_at': time.time()
}
except Exception as e:
print(f"🔥 处理数据 {data['id']} 时出错: {e}")
return None
async def start_processing(self, num_processors=3):
"""启动实时数据处理"""
self.running = True
# 启动多个数据处理器
processors = [
asyncio.create_task(self.data_processor(i))
for i in range(1, num_processors + 1)
]
return processors
def stop_processing(self):
"""停止处理"""
self.running = False
def get_stats(self):
"""获取处理统计"""
return {
"已处理": self.processed_count,
"错误数": self.error_count,
"队列大小": self.data_queue.qsize()
}
# WebSocket服务器应用
class AsyncWebSocketServer:
def __init__(self):
self.clients = set()
self.rooms = {} # 房间系统
async def register_client(self, websocket, path):
"""注册新客户端"""
self.clients.add(websocket)
print(f"👤 客户端连接: {websocket.remote_address}")
try:
await self.handle_client(websocket)
except Exception as e:
print(f"🚨 客户端处理错误: {e}")
finally:
self.clients.remove(websocket)
print(f"👋 客户端断开: {websocket.remote_address}")
async def handle_client(self, websocket):
"""处理客户端消息"""
async for message in websocket:
try:
# 解析消息
import json
data = json.loads(message)
message_type = data.get('type', 'message')
if message_type == 'join_room':
await self.join_room(websocket, data['room'])
elif message_type == 'leave_room':
await self.leave_room(websocket, data['room'])
elif message_type == 'broadcast':
await self.broadcast_to_room(data['room'], data['message'])
elif message_type == 'private':
await self.send_private_message(websocket, data)
else:
await self.broadcast_to_all(data)
except json.JSONDecodeError:
await websocket.send("错误: 无效的JSON格式")
except Exception as e:
await websocket.send(f"错误: {str(e)}")
async def join_room(self, websocket, room_name):
"""加入房间"""
if room_name not in self.rooms:
self.rooms[room_name] = set()
self.rooms[room_name].add(websocket)
await websocket.send(f"已加入房间: {room_name}")
print(f"🏠 客户端加入房间 {room_name}")
async def leave_room(self, websocket, room_name):
"""离开房间"""
if room_name in self.rooms:
self.rooms[room_name].discard(websocket)
if not self.rooms[room_name]:
del self.rooms[room_name]
await websocket.send(f"已离开房间: {room_name}")
async def broadcast_to_room(self, room_name, message):
"""向房间广播消息"""
if room_name in self.rooms:
disconnected = set()
for client in self.rooms[room_name].copy():
try:
await client.send(f"[房间 {room_name}] {message}")
except:
disconnected.add(client)
# 清理断开的连接
self.rooms[room_name] -= disconnected
async def broadcast_to_all(self, data):
"""向所有客户端广播"""
message = data.get('message', '空消息')
disconnected = set()
for client in self.clients.copy():
try:
await client.send(f"[广播] {message}")
except:
disconnected.add(client)
# 清理断开的连接
self.clients -= disconnected
def get_server_stats(self):
"""获取服务器统计信息"""
return {
"在线客户端": len(self.clients),
"活跃房间": len(self.rooms),
"房间详情": {name: len(clients) for name, clients in self.rooms.items()}
}
# API客户端应用
class AsyncAPIClient:
def __init__(self, base_url, max_concurrent=10):
self.base_url = base_url.rstrip('/')
self.semaphore = asyncio.Semaphore(max_concurrent)
self.session = None
self.request_stats = {
"total": 0,
"success": 0,
"error": 0,
"response_times": []
}
async def __aenter__(self):
self.session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=30),
headers={'User-Agent': 'AsyncAPIClient/1.0'}
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def make_request(self, method, endpoint, **kwargs):
"""发送HTTP请求"""
async with self.semaphore:
url = f"{self.base_url}{endpoint}"
start_time = time.time()
try:
self.request_stats["total"] += 1
async with self.session.request(method, url, **kwargs) as response:
response_time = time.time() - start_time
self.request_stats["response_times"].append(response_time)
if response.status < 400:
self.request_stats["success"] += 1
data = await response.json()
return {
"status": response.status,
"data": data,
"response_time": response_time
}
else:
self.request_stats["error"] += 1
return {
"status": response.status,
"error": f"HTTP {response.status}",
"response_time": response_time
}
except Exception as e:
self.request_stats["error"] += 1
response_time = time.time() - start_time
return {
"status": 0,
"error": str(e),
"response_time": response_time
}
async def batch_requests(self, requests):
"""批量发送请求"""
tasks = [
self.make_request(req["method"], req["endpoint"], **req.get("kwargs", {}))
for req in requests
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
async def get(self, endpoint, **kwargs):
return await self.make_request("GET", endpoint, **kwargs)
async def post(self, endpoint, **kwargs):
return await self.make_request("POST", endpoint, **kwargs)
async def put(self, endpoint, **kwargs):
return await self.make_request("PUT", endpoint, **kwargs)
async def delete(self, endpoint, **kwargs):
return await self.make_request("DELETE", endpoint, **kwargs)
def get_stats(self):
"""获取请求统计信息"""
if self.request_stats["response_times"]:
avg_response_time = sum(self.request_stats["response_times"]) / len(self.request_stats["response_times"])
max_response_time = max(self.request_stats["response_times"])
min_response_time = min(self.request_stats["response_times"])
else:
avg_response_time = max_response_time = min_response_time = 0
success_rate = (self.request_stats["success"] / self.request_stats["total"] * 100) if self.request_stats["total"] > 0 else 0
return {
"总请求数": self.request_stats["total"],
"成功数": self.request_stats["success"],
"失败数": self.request_stats["error"],
"成功率": f"{success_rate:.1f}%",
"平均响应时间": f"{avg_response_time:.3f}s",
"最大响应时间": f"{max_response_time:.3f}s",
"最小响应时间": f"{min_response_time:.3f}s"
}
# 应用示例演示
async def application_demos():
print("=== 异步应用场景演示 ===\n")
# 1. Web爬虫演示
print("🕷️ Web爬虫演示:")
try:
async with AsyncWebCrawler(max_concurrent=3, delay=0.5) as crawler:
# 演示爬取少量页面
start_urls = ["http://httpbin.org/"]
crawled_data = await crawler.crawl_website(start_urls, max_pages=3)
print(f"✅ 爬取完成,获得 {len(crawled_data)} 个页面")
except Exception as e:
print(f"⚠️ 爬虫演示跳过: {e}")
# 2. 数据库连接池演示
print(f"\n🗄️ 数据库连接池演示:")
db_pool = AsyncDatabasePool("mock://database", pool_size=3)
# 模拟多个并发数据库操作
queries = [
"SELECT * FROM users",
"SELECT * FROM orders",
"SELECT * FROM products",
"SELECT * FROM categories",
"SELECT * FROM reviews"
]
query_tasks = [db_pool.execute_query(query) for query in queries]
results = await asyncio.gather(*query_tasks)
print(f"📊 连接池状态: {db_pool.get_pool_stats()}")
# 3. 实时数据处理演示
print(f"\n⚡ 实时数据处理演示:")
processor = AsyncRealTimeProcessor()
# 启动处理器
processing_tasks = await processor.start_processing(num_processors=2)
# 启动数据生产者
producer_tasks = [
asyncio.create_task(processor.data_producer("sensor_A", 5)),
asyncio.create_task(processor.data_producer("sensor_B", 3))
]
# 等待生产者完成
await asyncio.gather(*producer_tasks)
# 等待一段时间让处理器处理完剩余数据
await asyncio.sleep(2)
# 停止处理器
processor.stop_processing()
await asyncio.gather(*processing_tasks)
print(f"📈 处理统计: {processor.get_stats()}")
# 4. API客户端演示
print(f"\n🌐 API客户端演示:")
try:
async with AsyncAPIClient("https://httpbin.org") as client:
# 发送多个并发请求
test_requests = [
{"method": "GET", "endpoint": "/get"},
{"method": "POST", "endpoint": "/post", "kwargs": {"json": {"test": "data"}}},
{"method": "GET", "endpoint": "/delay/1"},
]
results = await client.batch_requests(test_requests)
successful_requests = [r for r in results if not isinstance(r, Exception) and r.get("status", 0) < 400]
print(f"✅ API请求完成: {len(successful_requests)}/{len(test_requests)} 成功")
print(f"📊 客户端统计: {client.get_stats()}")
except Exception as e:
print(f"⚠️ API客户端演示跳过: {e}")
# 运行所有演示
asyncio.run(application_demos())
最佳实践
11.1 代码组织与架构
# 推荐的异步代码架构模式
# 1. 服务层架构
class AsyncServiceBase:
"""异步服务基类"""
def __init__(self):
self.logger = self.setup_logger()
self.metrics = {"requests": 0, "errors": 0}
def setup_logger(self):
import logging
logger = logging.getLogger(self.__class__.__name__)
logger.setLevel(logging.INFO)
return logger
async def __aenter__(self):
await self.startup()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.cleanup()
async def startup(self):
"""服务启动时的初始化"""
self.logger.info(f"启动 {self.__class__.__name__}")
async def cleanup(self):
"""服务关闭时的清理"""
self.logger.info(f"关闭 {self.__class__.__name__}")
def record_metric(self, metric_name, value=1):
"""记录指标"""
if metric_name not in self.metrics:
self.metrics[metric_name] = 0
self.metrics[metric_name] += value
# 2. 异步工厂模式
class AsyncResourceFactory:
"""异步资源工厂"""
@staticmethod
async def create_database_connection():
"""创建数据库连接"""
await asyncio.sleep(0.1) # 模拟连接建立
return "database_connection"
@staticmethod
async def create_redis_connection():
"""创建Redis连接"""
await asyncio.sleep(0.05) # 模拟连接建立
return "redis_connection"
@staticmethod
async def create_message_queue():
"""创建消息队列"""
return asyncio.Queue()
# 3. 异步单例模式
class AsyncSingleton:
"""异步单例模式"""
_instances = {}
_locks = {}
def __new__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._locks[cls] = asyncio.Lock()
return super().__new__(cls)
async def __aenter__(self):
async with self._locks[self.__class__]:
if self.__class__ not in self._instances:
await self._initialize()
self._instances[self.__class__] = self
return self._instances[self.__class__]
async def __aexit__(self, exc_type, exc_val, exc_tb):
pass
async def _initialize(self):
"""异步初始化"""
await asyncio.sleep(0.1) # 模拟初始化过程
print(f"初始化 {self.__class__.__name__}")
# 4. 配置管理
class AsyncConfig:
"""异步配置管理"""
def __init__(self):
self._config = {}
self._config_lock = asyncio.Lock()
async def load_config(self, config_source):
"""异步加载配置"""
async with self._config_lock:
# 模拟从文件/网络加载配置
await asyncio.sleep(0.1)
self._config.update(config_source)
async def get_config(self, key, default=None):
"""获取配置项"""
async with self._config_lock:
return self._config.get(key, default)
async def set_config(self, key, value):
"""设置配置项"""
async with self._config_lock:
self._config[key] = value
# 5. 错误恢复与重试
class AsyncRetryHandler:
"""异步重试处理器"""
@staticmethod
def retry(max_attempts=3, delay=1, backoff=2, exceptions=(Exception,)):
"""重试装饰器"""
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
last_exception = None
current_delay = delay
for attempt in range(max_attempts):
try:
return await func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt < max_attempts - 1:
print(f"⚠️ 第{attempt + 1}次尝试失败: {e}, {current_delay}秒后重试")
await asyncio.sleep(current_delay)
current_delay *= backoff
else:
print(f"❌ 所有{max_attempts}次尝试都失败")
raise last_exception
raise last_exception
return wrapper
return decorator
# 6. 异步上下文管理器链
class AsyncContextManager:
"""异步上下文管理器链"""
def __init__(self, *managers):
self.managers = managers
self.entered = []
async def __aenter__(self):
try:
for manager in self.managers:
result = await manager.__aenter__()
self.entered.append((manager, result))
return [result for _, result in self.entered]
except Exception:
# 如果任何一个失败,清理已进入的管理器
await self._cleanup()
raise
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._cleanup()
async def _cleanup(self):
# 反向清理
for manager, _ in reversed(self.entered):
try:
await manager.__aexit__(None, None, None)
except Exception as e:
print(f"清理时出错: {e}")
self.entered.clear()
11.2 性能和内存管理
# 内存管理最佳实践
class AsyncMemoryManager:
"""异步内存管理"""
@staticmethod
async def process_large_dataset_efficiently(dataset):
"""高效处理大数据集"""
chunk_size = 1000
processed_count = 0
for i in range(0, len(dataset), chunk_size):
chunk = dataset[i:i + chunk_size]
# 处理当前块
await AsyncMemoryManager._process_chunk(chunk)
processed_count += len(chunk)
# 定期让出控制权和垃圾回收
if processed_count % 10000 == 0:
await asyncio.sleep(0)
import gc
gc.collect() # 手动触发垃圾回收
return processed_count
@staticmethod
async def _process_chunk(chunk):
"""处理数据块"""
await asyncio.sleep(0.01) # 模拟处理时间
return [item * 2 for item in chunk]
@staticmethod
async def memory_efficient_file_processing(file_path):
"""内存高效的文件处理"""
try:
import aiofiles
line_count = 0
async with aiofiles.open(file_path, 'r') as file:
async for line in file:
# 处理单行,而不是一次性加载整个文件
processed_line = await AsyncMemoryManager._process_line(line)
line_count += 1
# 定期让出控制权
if line_count % 1000 == 0:
await asyncio.sleep(0)
return line_count
except ImportError:
print("需要安装 aiofiles: pip install aiofiles")
return 0
@staticmethod
async def _process_line(line):
"""处理单行"""
return line.strip().upper()
# 资源清理管理
class AsyncResourceManager:
"""异步资源管理器"""
def __init__(self):
self.resources = []
self.cleanup_tasks = []
async def add_resource(self, resource, cleanup_func=None):
"""添加需要管理的资源"""
self.resources.append((resource, cleanup_func))
async def cleanup_all(self):
"""清理所有资源"""
for resource, cleanup_func in reversed(self.resources):
try:
if cleanup_func:
if asyncio.iscoroutinefunction(cleanup_func):
await cleanup_func(resource)
else:
cleanup_func(resource)
print(f"✅ 清理资源: {resource}")
except Exception as e:
print(f"❌ 清理资源失败: {resource}, 错误: {e}")
self.resources.clear()
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.cleanup_all()
11.3 调试和测试
# 异步代码调试工具
class AsyncDebugger:
"""异步代码调试工具"""
@staticmethod
def trace_coroutine(name=None):
"""协程跟踪装饰器"""
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
func_name = name or f"{func.__module__}.{func.__name__}"
print(f"🔍 开始执行: {func_name}")
start_time = time.time()
try:
result = await func(*args, **kwargs)
end_time = time.time()
print(f"✅ 完成执行: {func_name} (耗时: {end_time - start_time:.4f}s)")
return result
except Exception as e:
end_time = time.time()
print(f"❌ 执行失败: {func_name} (耗时: {end_time - start_time:.4f}s), 错误: {e}")
raise
return wrapper
return decorator
@staticmethod
async def monitor_task_status():
"""监控任务状态"""
while True:
tasks = [task for task in asyncio.all_tasks() if not task.done()]
print(f"📊 当前活跃任务数: {len(tasks)}")
for i, task in enumerate(tasks[:5]): # 只显示前5个
print(f" {i+1}. {task.get_name()}: {task}")
await asyncio.sleep(5)
# 异步测试工具
class AsyncTester:
"""异步代码测试工具"""
@staticmethod
async def run_performance_test(coroutine_func, *args, iterations=100):
"""性能测试"""
print(f"🧪 开始性能测试: {coroutine_func.__name__} ({iterations}次)")
times = []
errors = 0
for i in range(iterations):
start_time = time.time()
try:
await coroutine_func(*args)
end_time = time.time()
times.append(end_time - start_time)
except Exception as e:
errors += 1
print(f"❌ 第{i+1}次测试出错: {e}")
if times:
avg_time = sum(times) / len(times)
min_time = min(times)
max_time = max(times)
print(f"📈 性能测试结果:")
print(f" 成功: {len(times)}/{iterations}")
print(f" 失败: {errors}/{iterations}")
print(f" 平均时间: {avg_time:.4f}s")
print(f" 最短时间: {min_time:.4f}s")
print(f" 最长时间: {max_time:.4f}s")
else:
print("❌ 所有测试都失败了")
@staticmethod
async def stress_test(coroutine_func, concurrent_count=100):
"""压力测试"""
print(f"💪 开始压力测试: {concurrent_count}个并发任务")
start_time = time.time()
tasks = [asyncio.create_task(coroutine_func()) for _ in range(concurrent_count)]
try:
results = await asyncio.gather(*tasks, return_exceptions=True)
end_time = time.time()
successful = [r for r in results if not isinstance(r, Exception)]
failed = len(results) - len(successful)
print(f"🏁 压力测试完成:")
print(f" 总耗时: {end_time - start_time:.2f}s")
引用链接
[1]
核心概念深入解析: #核心概念深入解析[2]
关键语法详解: #关键语法详解[3]
事件循环机制: #事件循环机制[4]
协程生命周期: #协程生命周期[5]
并发模式对比: #并发模式对比[6]
常用编程模式: #常用编程模式[7]
高级特性详解: #高级特性详解[8]
异常处理机制: #异常处理机制[9]
性能优化策略: #性能优化策略[10]
实际应用场景: #实际应用场景[11]
最佳实践: #最佳实践[12]
生态系统与第三方库: #生态系统与第三方库
评论区