目 录CONTENT

文章目录

python async 异步编程

Administrator
2025-09-08 / 0 评论 / 0 点赞 / 0 阅读 / 0 字

 

字数 9023,阅读大约需 46 分钟

python async 异步编程

Python异步编程(asyncio)完整详细指南

目录

  1. 1. 核心概念深入解析[1]

  2. 2. 关键语法详解[2]

  3. 3. 事件循环机制[3]

  4. 4. 协程生命周期[4]

  5. 5. 并发模式对比[5]

  6. 6. 常用编程模式[6]

  7. 7. 高级特性详解[7]

  8. 8. 异常处理机制[8]

  9. 9. 性能优化策略[9]

  10. 10. 实际应用场景[10]

  11. 11. 最佳实践[11]

  12. 12. 生态系统与第三方库[12]


核心概念深入解析

1.1 异步编程基础

什么是异步编程?

  • • 异步编程允许程序在等待某个操作完成时,继续执行其他任务

  • • 通过协作式多任务实现,而非抢占式多任务

  • • 单线程内实现并发,避免了线程间的复杂同步问题

并发 vs 并行

并发(Concurrency): 多个任务交替执行,给人同时进行的错觉
┌─────┐ ┌─────┐ ┌─────┐
│Task1│ │Task2│ │Task1│  时间轴 →
└─────┘ └─────┘ └─────┘

并行(Parallelism): 多个任务真正同时执行
┌─────┐
│Task1│  CPU核心1
└─────┘
┌─────┐
│Task2│  CPU核心2
└─────┘

1.2 协程(Coroutines)深入理解

协程的本质

  • • 协程是可以暂停和恢复执行的函数

  • • 保存执行状态,包括局部变量、执行位置等

  • • 通过yield点主动让出控制权

协程状态转换

创建 → 运行 → 暂停 → 恢复 → 完成
  ↓     ↓      ↓      ↓      ↓
CREATED → RUNNING → SUSPENDED → RUNNING → FINISHED

协程对象属性

import asyncio

async def example():
    return "Hello"

# 创建协程对象
coro = example()
print(f"协程对象: {coro}")
print(f"协程名称: {coro.__name__}")
print(f"协程状态: {coro.cr_running}")

# 必须运行才能获得结果
result = asyncio.run(coro)

关键语法详解

2.1 async/await 语法规则

async def 的使用规则

# ✅ 正确用法
async def valid_coroutine():
    return "valid"

async def with_await():
    result = await asyncio.sleep(1)  # 可以使用await
    return result

async def with_return():
    return "可以返回值"

# ✅ 异步生成器
async def async_generator():
    for i in range(3):
        yield i
        await asyncio.sleep(0.1)

# ❌ 错误用法
async def invalid():
    yield from range(3)  # SyntaxError: 不能使用yield from

def sync_function():
    result = await something()  # SyntaxError: 只能在async函数中使用await

await 的使用规则

import asyncio

async def awaitable_example():
    # ✅ 可以await的对象
    await asyncio.sleep(1)          # 内置协程
    await another_coroutine()       # 其他协程
    await asyncio.Future()          # Future对象
    await asyncio.Task()            # Task对象
    
    # ❌ 不能await的对象
    # await time.sleep(1)           # 普通函数
    # await "string"                # 普通对象

2.2 协程函数 vs 协程对象

async def my_coroutine():
    return "Hello World"

# 协程函数 - 定义
print(type(my_coroutine))  # <class 'function'>

# 协程对象 - 调用协程函数返回的对象
coro_obj = my_coroutine()
print(type(coro_obj))      # <class 'coroutine'>

# 执行协程对象
result = asyncio.run(coro_obj)
print(result)              # "Hello World"

事件循环机制

3.1 事件循环工作原理

事件循环的核心职责

  1. 1. 调度协程: 决定哪个协程应该运行

  2. 2. 处理I/O: 监控I/O操作完成状态

  3. 3. 执行回调: 处理定时器和回调函数

  4. 4. 管理任务: 创建、取消、监控任务状态

事件循环执行流程

1. 检查就绪的协程
2. 执行就绪协程直到遇到await
3. 处理I/O事件和回调
4. 检查定时器
5. 重复上述过程

3.2 事件循环操作详解

import asyncio

# 获取当前运行的事件循环
async def loop_info():
    loop = asyncio.get_running_loop()
    print(f"循环是否运行: {loop.is_running()}")
    print(f"循环是否关闭: {loop.is_closed()}")
    
    # 调度回调
    loop.call_soon(callback_function)
    
    # 调度延迟回调
    loop.call_later(5, delayed_callback)
    
    # 在指定时间调度
    import time
    loop.call_at(time.time() + 10, timed_callback)

def callback_function():
    print("立即执行的回调")

def delayed_callback():
    print("5秒后执行的回调")

def timed_callback():
    print("定时执行的回调")

3.3 多种运行方式

import asyncio

async def main():
    print("主协程执行")

# 方式1: 推荐的现代方式
asyncio.run(main())

# 方式2: 手动管理事件循环 (不推荐)
loop = asyncio.new_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

# 方式3: 获取现有循环
async def in_running_loop():
    loop = asyncio.get_running_loop()
    # 在已运行的循环中操作

协程生命周期

4.1 协程状态管理

import asyncio
import inspect

async def lifecycle_demo():
    print("协程开始执行")
    await asyncio.sleep(1)
    print("协程即将结束")
    return "完成"

async def monitor_coroutine():
    # 创建协程对象
    coro = lifecycle_demo()
    
    print(f"协程状态: {inspect.getcoroutinestate(coro)}")
    # 输出: CORO_CREATED
    
    # 创建任务
    task = asyncio.create_task(coro)
    print(f"任务已创建: {task}")
    print(f"任务是否完成: {task.done()}")
    
    # 等待完成
    result = await task
    print(f"任务结果: {result}")
    print(f"任务是否完成: {task.done()}")

asyncio.run(monitor_coroutine())

4.2 任务(Task)详解

import asyncio

async def task_operations():
    # 创建任务
    task1 = asyncio.create_task(slow_operation(1))
    task2 = asyncio.create_task(slow_operation(2))
    
    # 任务属性
    print(f"任务名称: {task1.get_name()}")
    print(f"任务是否取消: {task1.cancelled()}")
    print(f"任务是否完成: {task1.done()}")
    
    # 获取任务结果(会等待完成)
    try:
        result1 = await task1
        result2 = await task2
        print(f"结果: {result1}, {result2}")
    except asyncio.CancelledError:
        print("任务被取消")

async def slow_operation(id):
    await asyncio.sleep(2)
    return f"操作{id}完成"

# 任务取消示例
async def cancellation_demo():
    task = asyncio.create_task(slow_operation(1))
    
    # 等待1秒后取消
    await asyncio.sleep(1)
    task.cancel()
    
    try:
        await task
    except asyncio.CancelledError:
        print("任务被成功取消")

并发模式对比

5.1 四种并发模式详细对比

特性

同步

多线程

多进程

异步I/O

执行方式

顺序执行

抢占式多任务

真正并行

协作式多任务

内存开销

最小

中等(每线程8MB)

较大

CPU开销

最小

中等(上下文切换)

大(进程间通信)

适用任务

简单任务

I/O密集型

CPU密集型

I/O密集型

并发数量

1

受限(通常<1000)

受限(CPU核心数)

很高(>10000)

数据共享

不适用

简单但需同步

复杂(IPC)

简单(单线程)

调试难度

简单

困难(竞态条件)

困难

中等

5.2 性能对比实例

import asyncio
import threading
import multiprocessing
import time
import requests

# 同步版本
def sync_fetch(url):
    response = requests.get(url)
    return len(response.content)

def sync_main(urls):
    start = time.time()
    results = [sync_fetch(url) for url in urls]
    end = time.time()
    print(f"同步版本耗时: {end - start:.2f}秒")
    return results

# 多线程版本
def thread_main(urls):
    start = time.time()
    with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
        results = list(executor.map(sync_fetch, urls))
    end = time.time()
    print(f"多线程版本耗时: {end - start:.2f}秒")
    return results

# 异步版本
import aiohttp

async def async_fetch(session, url):
    async with session.get(url) as response:
        content = await response.read()
        return len(content)

async def async_main(urls):
    start = time.time()
    async with aiohttp.ClientSession() as session:
        tasks = [async_fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
    end = time.time()
    print(f"异步版本耗时: {end - start:.2f}秒")
    return results

# 测试URLs
urls = [
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1"
]

# 预期结果:
# 同步版本: ~5秒 (5个请求 × 1秒延迟)
# 多线程版本: ~1秒 (并行执行)
# 异步版本: ~1秒 (并发执行,开销更小)

常用编程模式

6.1 并发执行模式

模式1: 等待所有任务完成

import asyncio

async def gather_pattern():
    # 等待所有任务完成,获取所有结果
    results = await asyncio.gather(
        fetch_data(1),
        fetch_data(2),
        fetch_data(3),
        return_exceptions=True  # 返回异常而不是抛出
    )
    
    for i, result in enumerate(results, 1):
        if isinstance(result, Exception):
            print(f"任务{i}失败: {result}")
        else:
            print(f"任务{i}成功: {result}")

async def fetch_data(id):
    await asyncio.sleep(id)
    if id == 2:
        raise ValueError(f"任务{id}出错")
    return f"数据{id}"

模式2: 按完成顺序处理

async def as_completed_pattern():
    tasks = [
        asyncio.create_task(fetch_data(3)),
        asyncio.create_task(fetch_data(1)),
        asyncio.create_task(fetch_data(2))
    ]
    
    # 按完成顺序处理结果
    for completed_task in asyncio.as_completed(tasks):
        try:
            result = await completed_task
            print(f"完成: {result}")
        except Exception as e:
            print(f"错误: {e}")

模式3: 等待第一个完成

async def wait_pattern():
    tasks = [
        asyncio.create_task(fetch_data(3)),
        asyncio.create_task(fetch_data(1)),
        asyncio.create_task(fetch_data(2))
    ]
    
    # 等待第一个完成
    done, pending = await asyncio.wait(
        tasks, 
        return_when=asyncio.FIRST_COMPLETED
    )
    
    # 处理已完成的任务
    for task in done:
        result = await task
        print(f"第一个完成: {result}")
    
    # 取消剩余任务
    for task in pending:
        task.cancel()

6.2 生产者-消费者模式详解

import asyncio
import random

class AsyncProducerConsumer:
    def __init__(self, queue_size=10):
        self.queue = asyncio.Queue(maxsize=queue_size)
        self.running = True
    
    async def producer(self, name, item_count):
        """生产者协程"""
        for i in range(item_count):
            # 模拟生产时间
            await asyncio.sleep(random.uniform(0.1, 0.5))
            
            item = f"{name}-item-{i}"
            await self.queue.put(item)
            print(f"🏭 {name} 生产了: {item} (队列大小: {self.queue.qsize()})")
        
        print(f"✅ 生产者 {name} 完成")
    
    async def consumer(self, name):
        """消费者协程"""
        while self.running or not self.queue.empty():
            try:
                # 设置超时避免无限等待
                item = await asyncio.wait_for(
                    self.queue.get(), 
                    timeout=1.0
                )
                
                # 模拟处理时间
                await asyncio.sleep(random.uniform(0.2, 0.8))
                
                print(f"🔧 {name} 处理了: {item} (队列剩余: {self.queue.qsize()})")
                
                # 标记任务完成
                self.queue.task_done()
                
            except asyncio.TimeoutError:
                if not self.running:
                    break
                continue
        
        print(f"✅ 消费者 {name} 完成")
    
    async def run_simulation(self):
        # 创建生产者和消费者任务
        tasks = [
            # 2个生产者
            asyncio.create_task(self.producer("Producer-1", 5)),
            asyncio.create_task(self.producer("Producer-2", 3)),
            
            # 3个消费者
            asyncio.create_task(self.consumer("Consumer-A")),
            asyncio.create_task(self.consumer("Consumer-B")),
            asyncio.create_task(self.consumer("Consumer-C"))
        ]
        
        # 等待所有生产者完成
        await asyncio.gather(*tasks[:2])
        print("📢 所有生产者已完成,等待消费者处理完队列...")
        
        # 等待队列清空
        await self.queue.join()
        
        # 停止消费者
        self.running = False
        
        # 等待所有消费者完成
        await asyncio.gather(*tasks[2:])

# 运行示例
async def main():
    simulator = AsyncProducerConsumer(queue_size=5)
    await simulator.run_simulation()

asyncio.run(main())

6.3 协程链式调用模式

import asyncio

class DataPipeline:
    """数据处理管道示例"""
    
    async def fetch_raw_data(self, source_id):
        """第一步: 获取原始数据"""
        print(f"📥 获取数据源 {source_id}")
        await asyncio.sleep(0.5)  # 模拟网络请求
        return {"source_id": source_id, "raw_data": f"raw_{source_id}"}
    
    async def validate_data(self, data):
        """第二步: 数据验证"""
        print(f"✅ 验证数据 {data['source_id']}")
        await asyncio.sleep(0.2)  # 模拟验证过程
        
        if data["source_id"] % 3 == 0:  # 模拟验证失败
            raise ValueError(f"数据源 {data['source_id']} 验证失败")
        
        data["validated"] = True
        return data
    
    async def transform_data(self, data):
        """第三步: 数据转换"""
        print(f"🔄 转换数据 {data['source_id']}")
        await asyncio.sleep(0.3)  # 模拟转换过程
        
        data["transformed_data"] = data["raw_data"].upper()
        return data
    
    async def save_data(self, data):
        """第四步: 保存数据"""
        print(f"💾 保存数据 {data['source_id']}")
        await asyncio.sleep(0.1)  # 模拟保存过程
        
        data["saved"] = True
        return data
    
    async def process_single_source(self, source_id):
        """处理单个数据源的完整流程"""
        try:
            # 链式调用
            data = await self.fetch_raw_data(source_id)
            data = await self.validate_data(data)
            data = await self.transform_data(data)
            data = await self.save_data(data)
            
            print(f"✨ 数据源 {source_id} 处理完成")
            return data
            
        except Exception as e:
            print(f"❌ 数据源 {source_id} 处理失败: {e}")
            return None
    
    async def process_multiple_sources(self, source_ids):
        """并发处理多个数据源"""
        print(f"🚀 开始处理 {len(source_ids)} 个数据源")
        
        # 并发执行所有数据源处理
        results = await asyncio.gather(
            *[self.process_single_source(sid) for sid in source_ids],
            return_exceptions=True
        )
        
        # 统计结果
        successful = [r for r in results if r is not None and not isinstance(r, Exception)]
        failed = len(results) - len(successful)
        
        print(f"📊 处理完成: {len(successful)} 成功, {failed} 失败")
        return successful

# 使用示例
async def main():
    pipeline = DataPipeline()
    
    # 处理数据源 1-6
    results = await pipeline.process_multiple_sources(range(1, 7))
    
    print(f"\n📈 最终结果: 成功处理 {len(results)} 个数据源")

asyncio.run(main())

高级特性详解

7.1 异步迭代器与异步生成器

import asyncio

# 异步迭代器类
class AsyncRange:
    def __init__(self, start, stop):
        self.start = start
        self.stop = stop
    
    def __aiter__(self):
        return self
    
    async def __anext__(self):
        if self.start >= self.stransform: translateY(
            raise StopAsyncIteration
        
        current = self.start
        self.start += 1
        
        # 模拟异步操作
        await asyncio.sleep(0.1)
        return current

# 异步生成器
async def async_fibonacci(n):
    """异步斐波那契数列生成器"""
    a, b = 0, 1
    count = 0
    
    while count < n:
        yield a
        a, b = b, a + b
        count += 1
        # 每次生成后暂停,允许其他协程运行
        await asyncio.sleep(0.05)

# 异步数据流处理器
class AsyncDataStream:
    def __init__(self, data_source):
        self.data_source = data_source
    
    async def __aiter__(self):
        for item) in self.data_source:
            # 模拟异步数据处理
            await asyncio.sleep(0.1)
            processed = await self.process_item(item)
            yield processed
    
    async def process_item(self, item):
        """异步处理单个数据项"""
        await asyncio.sleep(0.05)  # 模拟处理时间
        return f"processed_{item}"

# 使用示例
async def async_iteration_demo():
    print("=== 异步迭代器示例 ===")
    async for num in AsyncRange(1, 5):
        print(f"异步迭代器产生: {num}")
    
    print("\n=== 异步生成器示例 ===")
    async for fib in async_fibonacci(8):
        print(f"斐波那契数: {fib}")
    
    print("\n=== 异步列表推导式 ===")
    squares = [x**2 async for x in AsyncRange(1, 6)]
    print(f"异步生成的平方数: {squares}")
    
    print("\n=== 异步数据流处理 ===")
    stream = AsyncDataStream(['a', 'b', 'c', 'd'])
    async for processed in stream:
        print(f"流处理结果: {processed}")

asyncio.run(async_iteration_demo())

7.2 异步上下文管理器

import asyncio
import aiofiles
import aiohttp

# 自定义异步上下文管理器
class AsyncDatabaseConnection:
    def __init__(self, db_url):
        self.db_url = db_url
        self.connection = None
    
    async def __aenter__(self):
        print(f"🔌 连接到数据库: {self.db_url}")
        await asyncio.sleep(0.1)  # 模拟连接时间
        self.connection = f"connection_to_{self.db_url}"
        return self.connection
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print(f"📴 关闭数据库连接")
        await asyncio.sleep(0.05)  # 模拟关闭时间
        self.connection = None
        
        if exc_type:
            print(f"⚠️ 异常发生: {exc_type.__name__}: {exc_val}")
        
        # 返回False表示不抑制异常
        return False

# 异步资源管理器
class AsyncResourceManager:
    def __init__(self, resource_name):
        self.resource_name = resource_name
        self.acquired = False
    
    async def __aenter__(self):
        print(f"🔓 获取资源: {self.resource_name}")
        await asyncio.sleep(0.1)
        self.acquired = True
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print(f"🔒 释放资源: {self.resource_name}")
        await asyncio.sleep(0.05)
        self.acquired = False
    
    async def do_work(self):
        if not self.acquired:
            raise RuntimeError("资源未获取")
        
        print(f"⚙️ 使用资源 {self.resource_name} 进行工作")
        await asyncio.sleep(0.2)
        return f"work_result_from_{self.resource_name}"

# 使用示例
async def context_manager_demo():
    print("=== 异步数据库连接示例 ===")
    async with AsyncDatabaseConnection("postgresql://localhost") as conn:
        print(f"🗃️ 使用连接: {conn}")
        await asyncio.sleep(0.1)
    
    print("\n=== 异步资源管理示例 ===")
    async with AsyncResourceManager("critical_resource") as resource:
        result = await resource.do_work()
        print(f"📊 工作结果: {result}")
    
    print("\n=== 异常处理示例 ===")
    try:
        async with AsyncDatabaseConnection("invalid://db") as conn:
            print("这行不会执行")
            raise ValueError("模拟业务异常")
    except ValueError as e:
        print(f"🚨 捕获异常: {e}")
    
    print("\n=== 文件I/O示例 ===")
    # 使用第三方库 aiofiles
    try:
        async with aiofiles.open('temp_file.txt', 'w') as f:
            await f.write("异步文件写入测试\n")
            await f.write("第二行内容\n")
        
        async with aiofiles.open('temp_file.txt', 'r') as f:
            content = await f.read()
            print(f"📄 文件内容:\n{content}")
    except Exception as e:
        print(f"文件操作跳过: {e}")

asyncio.run(context_manager_demo())

7.3 异步信号量与锁

import asyncio
import random

class AsyncConcurrencyControl:
    def __init__(self):
        # 信号量: 限制同时执行的协程数量
        self.semaphore = asyncio.Semaphore(3)
        
        # 互斥锁: 保护共享资源
        self.lock = asyncio.Lock()
        
        # 事件: 协程间通信
        self.event = asyncio.Event()
        
        # 条件变量: 复杂的同步控制
        self.condition = asyncio.Condition()
        
        # 共享资源
        self.shared_counter = 0
        self.shared_list = []
    
    async def semaphore_demo(self, worker_id):
        """信号量限制并发数"""
        async with self.semaphore:
            print(f"🔧 工作者 {worker_id} 开始工作")
            # 模拟工作时间
            work_time = random.uniform(1, 3)
            await asyncio.sleep(work_time)
            print(f"✅ 工作者 {worker_id} 完成工作 (耗时 {work_time:.1f}s)")
    
    async def lock_demo(self, worker_id):
        """互斥锁保护共享资源"""
        async with self.lock:
            print(f"🔒 工作者 {worker_id} 获得锁")
            
            # 临界区: 修改共享资源
            old_value = self.shared_counter
            await asyncio.sleep(0.1)  # 模拟处理时间
            self.shared_counter = old_value + 1
            
            print(f"📊 工作者 {worker_id} 更新计数器: {old_value} → {self.shared_counter}")
    
    async def event_waiter(self, waiter_id):
        """等待事件发生"""
        print(f"⏳ 等待者 {waiter_id} 开始等待事件")
        await self.event.wait()
        print(f"🎉 等待者 {waiter_id} 收到事件通知")
    
    async def event_setter(self):
        """设置事件"""
        await asyncio.sleep(2)  # 等待2秒后触发事件
        print(f"📡 事件被设置")
        self.event.set()
    
    async def condition_consumer(self, consumer_id):
        """条件变量消费者"""
        async with self.condition:
            await self.condition.wait_for(lambda: len(self.shared_list) > 0)
            item = self.shared_list.pop(0)
            print(f"🔽 消费者 {consumer_id} 消费了: {item}")
    
    async def condition_producer(self, producer_id):
        """条件变量生产者"""
        for i in range(3):
            await asyncio.sleep(1)
            async with self.condition:
                item = f"item_{producer_id}_{i}"
                self.shared_list.append(item)
                print(f"🔼 生产者 {producer_id} 生产了: {item}")
                self.condition.notify_all()  # 通知所有等待者

# 并发控制示例
async def concurrency_control_demo():
    controller = AsyncConcurrencyControl()
    
    print("=== 信号量限制并发数示例 ===")
    # 创建5个工作者,但信号量只允许3个同时执行
    semaphore_tasks = [
        asyncio.create_task(controller.semaphore_demo(i))
        for i in range(1, 6)
    ]
    await asyncio.gather(*semaphore_tasks)
    
    print(f"\n=== 互斥锁保护共享资源示例 ===")
    # 多个工作者同时修改共享计数器
    lock_tasks = [
        asyncio.create_task(controller.lock_demo(i))
        for i in range(1, 6)
    ]
    await asyncio.gather(*lock_tasks)
    print(f"🏁 最终计数器值: {controller.shared_counter}")
    
    print(f"\n=== 事件通知示例 ===")
    # 创建等待者和事件设置者
    event_tasks = [
        asyncio.create_task(controller.event_waiter(i))
        for i in range(1, 4)
    ]
    event_tasks.append(asyncio.create_task(controller.event_setter()))
    await asyncio.gather(*event_tasks)
    
    print(f"\n=== 条件变量示例 ===")
    # 生产者-消费者模式使用条件变量
    condition_tasks = [
        asyncio.create_task(controller.condition_producer(1)),
        asyncio.create_task(controller.condition_consumer(1)),
        asyncio.create_task(controller.condition_consumer(2)),
    ]
    await asyncio.gather(*condition_tasks)

asyncio.run(concurrency_control_demo())

异常处理机制

8.1 异步异常处理基础

import asyncio

class AsyncExceptionHandling:
    
    async def basic_exception_demo(self):
        """基础异常处理"""
        try:
            await self.failing_operation()
        except ValueError as e:
            print(f"❌ 捕获ValueError: {e}")
        except asyncio.TimeoutError as e:
            print(f"⏰ 操作超时: {e}")
        except Exception as e:
            print(f"🚨 未知异常: {type(e).__name__}: {e}")
        else:
            print("✅ 操作成功完成")
        finally:
            print("🧹 清理操作")
    
    async def failing_operation(self):
        """模拟可能失败的异步操作"""
        import random
        await asyncio.sleep(0.1)
        
        error_type = random.choice(['value', 'timeout', 'success'])
        if error_type == 'value':
            raise ValueError("数据验证失败")
        elif error_type == 'timeout':
            raise asyncio.TimeoutError("操作超时")
        else:
            return "操作成功"
    
    async def timeout_handling_demo(self):
        """超时处理示例"""
        try:
            # 设置2秒超时
            result = await asyncio.wait_for(
                self.slow_operation(3),  # 需要3秒的操作
                timeout=2.0
            )
            print(f"✅ 操作完成: {result}")
        except asyncio.TimeoutError:
            print("⏰ 操作超时,已取消")
    
    async def slow_operation(self, duration):
        """模拟慢操作"""
        print(f"🐌 开始慢操作 (需要{duration}秒)")
        await asyncio.sleep(duration)
        return f"完成{duration}秒操作"
    
    async def gather_exception_demo(self):
        """gather中的异常处理"""
        print("\n--- gather异常处理 (return_exceptions=False) ---")
        try:
            results = await asyncio.gather(
                self.operation_success("A"),
                self.operation_failure("B"),  # 这个会失败
                self.operation_success("C"),
                # return_exceptions=False  # 默认值,异常会被抛出
            )
        except ValueError as e:
            print(f"❌ gather被异常中断: {e}")
        
        print("\n--- gather异常处理 (return_exceptions=True) ---")
        results = await asyncio.gather(
            self.operation_success("A"),
            self.operation_failure("B"),  # 这个会失败
            self.operation_success("C"),
            return_exceptions=True  # 异常作为结果返回
        )
        
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                print(f"🔴 任务{i}异常: {result}")
            else:
                print(f"🟢 任务{i}成功: {result}")
    
    async def operation_success(self, name):
        await asyncio.sleep(0.1)
        return f"操作{name}成功"
    
    async def operation_failure(self, name):
        await asyncio.sleep(0.1)
        raise ValueError(f"操作{name}失败")

# Python 3.11+ 异常组处理
class AsyncExceptionGroups:
    
    async def exception_group_demo(self):
        """异常组处理示例 (Python 3.11+)"""
        try:
            results = await asyncio.gather(
                self.coro_error_a(),
                self.coro_error_b(),
                self.coro_error_c(),
                return_exceptions=True
            )
            
            # 收集所有异常
            exceptions = [r for r in results if isinstance(r, Exception)]
            if exceptions:
                raise ExceptionGroup("批量操作失败", exceptions)
                
        except* ValueError as eg:
            print(f"🔴 处理ValueError组: {len(eg.exceptions)} 个")
            for exc in eg.exceptions:
                print(f"  - {exc}")
                
        except* TypeError as eg:
            print(f"🟠 处理TypeError组: {len(eg.exceptions)} 个")
            for exc in eg.exceptions:
                print(f"  - {exc}")
                
        except* KeyError as eg:
            print(f"🟡 处理KeyError组: {len(eg.exceptions)} 个")
            for exc in eg.exceptions:
                print(f"  - {exc}")
    
    async def coro_error_a(self):
        await asyncio.sleep(0.1)
        raise ValueError("A操作失败")
    
    async def coro_error_b(self):
        await asyncio.sleep(0.1)
        raise TypeError("B操作类型错误")
    
    async def coro_error_c(self):
        await asyncio.sleep(0.1)
        raise KeyError("C操作键错误")

# 异常处理最佳实践
class AsyncExceptionBestPractices:
    
    async def robust_operation_demo(self):
        """健壮的异步操作示例"""
        max_retries = 3
        retry_delay = 1
        
        for attempt in range(max_retries):
            try:
                result = await asyncio.wait_for(
                    self.unreliable_operation(),
                    timeout=5.0
                )
                print(f"✅ 操作成功: {result}")
                return result
                
            except asyncio.TimeoutError:
                print(f"⏰ 第{attempt + 1}次尝试超时")
                if attempt < max_retries - 1:
                    await asyncio.sleep(retry_delay)
                    retry_delay *= 2  # 指数退避
                else:
                    print("❌ 所有重试都超时,操作失败")
                    raise
                    
            except Exception as e:
                print(f"🚨 第{attempt + 1}次尝试出错: {e}")
                if attempt < max_retries - 1:
                    await asyncio.sleep(retry_delay)
                else:
                    print("❌ 所有重试都失败")
                    raise
    
    async def unreliable_operation(self):
        """模拟不稳定的操作"""
        import random
        await asyncio.sleep(random.uniform(0.1, 6))  # 随机延迟
        
        if random.random() < 0.7:  # 70%概率失败
            raise ConnectionError("网络连接不稳定")
        
        return "操作成功完成"

# 运行异常处理示例
async def exception_handling_demo():
    handler = AsyncExceptionHandling()
    
    print("=== 基础异常处理 ===")
    await handler.basic_exception_demo()
    
    print(f"\n=== 超时处理 ===")
    await handler.timeout_handling_demo()
    
    print(f"\n=== gather异常处理 ===")
    await handler.gather_exception_demo()
    
    # Python 3.11+ 特性
    if hasattr(builtins, 'ExceptionGroup'):
        print(f"\n=== 异常组处理 (Python 3.11+) ===")
        group_handler = AsyncExceptionGroups()
        await group_handler.exception_group_demo()
    
    print(f"\n=== 健壮操作示例 ===")
    best_practices = AsyncExceptionBestPractices()
    try:
        await best_practices.robust_operation_demo()
    except Exception as e:
        print(f"最终失败: {e}")

asyncio.run(exception_handling_demo())

性能优化策略

9.1 性能监控与分析

import asyncio
import time
import functools
import cProfile
import pstats
from contextlib import asynccontextmanager

class AsyncPerformanceAnalyzer:
    def __init__(self):
        self.metrics = {}
    
    def timer(self, name=None):
        """异步函数执行时间装饰器"""
        def decorator(func):
            @functools.wraps(func)
            async def wrapper(*args, **kwargs):
                func_name = name or f"{func.__module__}.{func.__name__}"
                start_time = time.perf_counter()
                
                try:
                    result = await func(*args, **kwargs)
                    return result
                finally:
                    end_time = time.perf_counter()
                    execution_time = end_time - start_time
                    
                    if func_name not in self.metrics:
                        self.metrics[func_name] = []
                    self.metrics[func_name].append(execution_time)
                    
                    print(f"⏱️ {func_name} 执行时间: {execution_time:.4f}秒")
            
            return wrapper
        return decorator
    
    @asynccontextmanager
    async def profile_context(self, name="async_profile"):
        """异步代码性能分析上下文管理器"""
        profiler = cProfile.Profile()
        profiler.enable()
        
        start_time = time.perf_counter()
        try:
            yield
        finally:
            end_time = time.perf_counter()
            profiler.disable()
            
            print(f"\n📊 性能分析报告: {name}")
            print(f"总执行时间: {end_time - start_time:.4f}秒")
            
            # 创建统计对象
            stats = pstats.Stats(profiler)
            stats.sort_stats('cumulative')
            stats.print_stats(10)  # 显示前10个最耗时的函数
    
    def print_summary(self):
        """打印性能摘要"""
        print("\n📈 性能摘要:")
        for func_name, times in self.metrics.items():
            avg_time = sum(times) / len(times)
            min_time = min(times)
            max_time = max(times)
            total_time = sum(times)
            
            print(f"  {func_name}:")
            print(f"    调用次数: {len(times)}")
            print(f"    平均时间: {avg_time:.4f}秒")
            print(f"    最短时间: {min_time:.4f}秒")
            print(f"    最长时间: {max_time:.4f}秒")
            print(f"    总计时间: {total_time:.4f}秒")

# 性能优化示例
class AsyncPerformanceOptimization:
    def __init__(self):
        self.analyzer = AsyncPerformanceAnalyzer()
    
    @AsyncPerformanceAnalyzer().timer("slow_sequential")
    async def slow_sequential_approach(self, urls):
        """低效的顺序方法"""
        results = []
        for url in urls:
            result = await self.simulate_network_request(url)
            results.append(result)
        return results
    
    @AsyncPerformanceAnalyzer().timer("fast_concurrent")
    async def fast_concurrent_approach(self, urls):
        """高效的并发方法"""
        tasks = [self.simulate_network_request(url) for url in urls]
        results = await asyncio.gather(*tasks)
        return results
    
    @AsyncPerformanceAnalyzer().timer("optimized_concurrent")
    async def optimized_concurrent_approach(self, urls, max_concurrent=10):
        """优化的并发方法 - 限制并发数"""
        semaphore = asyncio.Semaphore(max_concurrent)
        
        async def limited_request(url):
            async with semaphore:
                return await self.simulate_network_request(url)
        
        tasks = [limited_request(url) for url in urls]
        results = await asyncio.gather(*tasks)
        return results
    
    async def simulate_network_request(self, url):
        """模拟网络请求"""
        await asyncio.sleep(0.1)  # 模拟网络延迟
        return f"Response from {url}"
    
    async def cpu_bound_task(self, n):
        """CPU密集型任务(不适合asyncio)"""
        total = 0
        for i in range(n):
            total += i * i
            if i % 100000 == 0:
                await asyncio.sleep(0)  # 让出控制权
        return total
    
    async def memory_efficient_processing(self, large_dataset):
        """内存高效的数据处理"""
        # 使用异步生成器处理大数据集
        async def process_chunk(chunk):
            await asyncio.sleep(0.01)  # 模拟处理时间
            return [item * 2 for item in chunk]
        
        chunk_size = 1000
        results = []
        
        for i in range(0, len(large_dataset), chunk_size):
            chunk = large_dataset[i:i + chunk_size]
            processed_chunk = await process_chunk(chunk)
            results.extend(processed_chunk)
            
            # 定期让出控制权
            if i % (chunk_size * 10) == 0:
                await asyncio.sleep(0)
        
        return results

# 缓存优化
class AsyncCacheOptimization:
    def __init__(self):
        self.cache = {}
        self.cache_stats = {"hits": 0, "misses": 0}
    
    def async_lru_cache(self, maxsize=128):
        """异步LRU缓存装饰器"""
        def decorator(func):
            cache = {}
            cache_order = []
            
            @functools.wraps(func)
            async def wrapper(*args, **kwargs):
                key = str(args) + str(sorted(kwargs.items()))
                
                if key in cache:
                    # 缓存命中,更新访问顺序
                    cache_order.remove(key)
                    cache_order.append(key)
                    self.cache_stats["hits"] += 1
                    print(f"💾 缓存命中: {func.__name__}")
                    return cache[key]
                
                # 缓存未命中
                self.cache_stats["misses"] += 1
                result = await func(*args, **kwargs)
                
                # 添加到缓存
                cache[key] = result
                cache_order.append(key)
                
                # 如果超过最大大小,删除最久未使用的
                if len(cache) > maxsize:
                    oldest_key = cache_order.pop(0)
                    del cache[oldest_key]
                
                print(f"💿 缓存存储: {func.__name__}")
                return result
            
            return wrapper
        return decorator
    
    @async_lru_cache(maxsize=32)
    async def expensive_computation(self, x, y):
        """昂贵的计算操作"""
        print(f"🔥 执行昂贵计算: f({x}, {y})")
        await asyncio.sleep(1)  # 模拟耗时计算
        return x ** y + y ** x
    
    def print_cache_stats(self):
        total = self.cache_stats["hits"] + self.cache_stats["misses"]
        if total > 0:
            hit_rate = self.cache_stats["hits"] / total * 100
            print(f"📊 缓存统计: 命中率 {hit_rate:.1f}% ({self.cache_stats['hits']}/{total})")

# 连接池优化
class AsyncConnectionPoolOptimization:
    def __init__(self, pool_size=10):
        self.pool_size = pool_size
        self.semaphore = asyncio.Semaphore(pool_size)
        self.active_connections = 0
        self.total_requests = 0
    
    @asynccontextmanager
    async def get_connection(self):
        """模拟连接池中的连接获取"""
        async with self.semaphore:
            self.active_connections += 1
            self.total_requests += 1
            
            print(f"🔗 获取连接 (活跃: {self.active_connections}/{self.pool_size})")
            
            try:
                # 模拟连接建立时间
                await asyncio.sleep(0.01)
                yield f"connection_{self.total_requests}"
            finally:
                self.active_connections -= 1
                print(f"📤 释放连接 (活跃: {self.active_connections}/{self.pool_size})")
    
    async def make_request_with_pool(self, request_id):
        """使用连接池发送请求"""
        async with self.get_connection() as conn:
            # 模拟请求处理
            await asyncio.sleep(0.5)
            return f"Response for request {request_id} via {conn}"

# 性能测试运行
async def performance_optimization_demo():
    print("=== 性能优化示例 ===")
    
    optimizer = AsyncPerformanceOptimization()
    
    # 测试URL列表
    test_urls = [f"http://api{i}.example.com" for i in range(20)]
    
    # 对比不同方法的性能
    print("🐌 顺序方法:")
    await optimizer.slow_sequential_approach(test_urls[:5])  # 只测试5个URL
    
    print(f"\n🚀 并发方法:")
    await optimizer.fast_concurrent_approach(test_urls)
    
    print(f"\n⚡ 优化并发方法:")
    await optimizer.optimized_concurrent_approach(test_urls, max_concurrent=5)
    
    # 缓存优化测试
    print(f"\n=== 缓存优化测试 ===")
    cache_optimizer = AsyncCacheOptimization()
    
    # 第一次调用 - 缓存未命中
    await cache_optimizer.expensive_computation(2, 3)
    await cache_optimizer.expensive_computation(3, 4)
    
    # 第二次调用 - 缓存命中
    await cache_optimizer.expensive_computation(2, 3)
    await cache_optimizer.expensive_computation(3, 4)
    
    cache_optimizer.print_cache_stats()
    
    # 连接池优化测试
    print(f"\n=== 连接池优化测试 ===")
    pool_optimizer = AsyncConnectionPoolOptimization(pool_size=3)
    
    # 并发请求超过连接池大小
    requests = [
        asyncio.create_task(pool_optimizer.make_request_with_pool(i))
        for i in range(8)
    ]
    
    results = await asyncio.gather(*requests)
    print(f"✅ 完成 {len(results)} 个请求")

asyncio.run(performance_optimization_demo())

实际应用场景

10.1 Web爬虫应用

import asyncio
import aiohttp
import time
from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup

class AsyncWebCrawler:
    def __init__(self, max_concurrent=10, delay=1):
        self.max_concurrent = max_concurrent
        self.delay = delay  # 请求间隔,避免过于频繁
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.visited_urls = set()
        self.session = None
    
    async def __aenter__(self):
        self.session = aiohttp.ClientSession(
            timeout=aiohttp.ClientTimeout(total=10),
            headers={'User-Agent': 'AsyncWebCrawler/1.0'}
        )
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()
    
    async def fetch_url(self, url):
        """获取单个URL的内容"""
        if url in self.visited_urls:
            return None
        
        self.visited_urls.add(url)
        
        async with self.semaphore:
            try:
                print(f"🔍 爬取: {url}")
                
                async with self.session.get(url) as response:
                    if response.status == 200:
                        content = await response.text()
                        await asyncio.sleep(self.delay)  # 礼貌延迟
                        return {
                            'url': url,
                            'status': response.status,
                            'content': content,
                            'headers': dict(response.headers)
                        }
                    else:
                        print(f"❌ HTTP {response.status}: {url}")
                        return None
                        
            except Exception as e:
                print(f"🚨 错误 {url}: {e}")
                return None
    
    async def extract_links(self, html_content, base_url):
        """从HTML中提取链接"""
        try:
            soup = BeautifulSoup(html_content, 'html.parser')
            links = []
            
            for link in soup.find_all('a', href=True):
                href = link['href']
                absolute_url = urljoin(base_url, href)
                
                # 只处理HTTP/HTTPS链接
                if absolute_url.startswith(('http://', 'https://')):
                    links.append(absolute_url)
            
            return links
        except Exception as e:
            print(f"🔗 链接提取错误: {e}")
            return []
    
    async def crawl_website(self, start_urls, max_pages=50):
        """爬取整个网站"""
        crawl_queue = list(start_urls)
        crawled_data = []
        pages_crawled = 0
        
        while crawl_queue and pages_crawled < max_pages:
            # 获取下一批要爬取的URL
            batch_size = min(self.max_concurrent, len(crawl_queue))
            current_batch = crawl_queue[:batch_size]
            crawl_queue = crawl_queue[batch_size:]
            
            # 并发爬取当前批次
            tasks = [self.fetch_url(url) for url in current_batch]
            results = await asyncio.gather(*tasks, return_exceptions=True)
            
            # 处理结果
            for result in results:
                if result and not isinstance(result, Exception):
                    crawled_data.append(result)
                    pages_crawled += 1
                    
                    # 提取新链接加入队列
                    new_links = await self.extract_links(
                        result['content'], 
                        result['url']
                    )
                    
                    # 过滤已访问的链接
                    for link in new_links:
                        if (link not in self.visited_urls and 
                            link not in crawl_queue and
                            self.is_same_domain(link, start_urls[0])):
                            crawl_queue.append(link)
        
        return crawled_data
    
    def is_same_domain(self, url1, url2):
        """检查两个URL是否在同一域名下"""
        return urlparse(url1).netloc == urlparse(url2).netloc

# Web服务器应用
class AsyncWebServer:
    def __init__(self, host='localhost', port=8080):
        self.host = host
        self.port = port
        self.routes = {}
        self.middleware = []
    
    def route(self, path, methods=['GET']):
        """路由装饰器"""
        def decorator(handler):
            self.routes[path] = {
                'handler': handler,
                'methods': methods
            }
            return handler
        return decorator
    
    def middleware_decorator(self, middleware_func):
        """中间件装饰器"""
        self.middleware.append(middleware_func)
        return middleware_func
    
    async def handle_request(self, reader, writer):
        """处理HTTP请求"""
        try:
            # 读取请求行
            request_line = await reader.readline()
            request_line = request_line.decode().strip()
            
            if not request_line:
                return
            
            method, path, _ = request_line.split(' ', 2)
            
            # 读取headers
            headers = {}
            while True:
                line = await reader.readline()
                if line == b'\r\n':
                    break
                if line:
                    key, value = line.decode().strip().split(':', 1)
                    headers[key.strip()] = value.strip()
            
            # 构造请求对象
            request = {
                'method': method,
                'path': path,
                'headers': headers,
                'reader': reader
            }
            
            # 应用中间件
            for middleware in self.middleware:
                request = await middleware(request) or request
            
            # 路由处理
            if path in self.routes:
                route_info = self.routes[path]
                if method in route_info['methods']:
                    response = await route_info['handler'](request)
                else:
                    response = "HTTP/1.1 405 Method Not Allowed\r\n\r\n"
            else:
                response = "HTTP/1.1 404 Not Found\r\n\r\n404 - Page Not Found"
            
            # 发送响应
            if isinstance(response, str):
                response = f"HTTP/1.1 200 OK\r\nContent-Length: {len(response)}\r\n\r\n{response}"
            
            writer.write(response.encode())
            await writer.drain()
            
        except Exception as e:
            error_response = f"HTTP/1.1 500 Internal Server Error\r\n\r\nError: {str(e)}"
            writer.write(error_response.encode())
            await writer.drain()
        finally:
            writer.close()
            await writer.wait_closed()
    
    async def start_server(self):
        """启动服务器"""
        server = await asyncio.start_server(
            self.handle_request, 
            self.host, 
            self.port
        )
        
        print(f"🌐 服务器启动: http://{self.host}:{self.port}")
        
        async with server:
            await server.serve_forever()

# 数据库连接池应用
class AsyncDatabasePool:
    def __init__(self, connection_string, pool_size=10):
        self.connection_string = connection_string
        self.pool_size = pool_size
        self.pool = asyncio.Queue(maxsize=pool_size)
        self.created_connections = 0
        self.active_connections = 0
    
    async def create_connection(self):
        """创建新的数据库连接"""
        self.created_connections += 1
        await asyncio.sleep(0.1)  # 模拟连接建立时间
        return f"db_connection_{self.created_connections}"
    
    async def get_connection(self):
        """从连接池获取连接"""
        try:
            # 尝试从池中获取现有连接
            connection = self.pool.get_nowait()
        except asyncio.QueueEmpty:
            # 如果池为空且未达到最大连接数,创建新连接
            if self.created_connections < self.pool_size:
                connection = await self.create_connection()
            else:
                # 等待池中有可用连接
                connection = await self.pool.get()
        
        self.active_connections += 1
        return connection
    
    async def return_connection(self, connection):
        """将连接返回池中"""
        self.active_connections -= 1
        await self.pool.put(connection)
    
    async def execute_query(self, query):
        """执行数据库查询"""
        connection = await self.get_connection()
        try:
            print(f"🗃️ 执行查询: {query} (连接: {connection})")
            await asyncio.sleep(0.2)  # 模拟查询执行时间
            return f"查询结果: {query}"
        finally:
            await self.return_connection(connection)
    
    def get_pool_stats(self):
        """获取连接池统计信息"""
        return {
            "总连接数": self.created_connections,
            "活跃连接数": self.active_connections,
            "池中可用连接数": self.pool.qsize()
        }

# 实时数据处理应用
class AsyncRealTimeProcessor:
    def __init__(self):
        self.data_queue = asyncio.Queue()
        self.processed_count = 0
        self.error_count = 0
        self.running = False
    
    async def data_producer(self, data_source_name, count):
        """数据生产者 - 模拟实时数据流"""
        for i in range(count):
            data = {
                'id': f"{data_source_name}_{i}",
                'timestamp': time.time(),
                'value': i * 10 + (i % 7),  # 模拟数据
                'source': data_source_name
            }
            
            await self.data_queue.put(data)
            print(f"📨 生产数据: {data['id']}")
            await asyncio.sleep(0.1)  # 模拟数据产生间隔
        
        print(f"✅ 数据源 {data_source_name} 完成")
    
    async def data_processor(self, processor_id):
        """数据处理器"""
        while self.running or not self.data_queue.empty():
            try:
                # 等待数据,设置超时避免无限等待
                data = await asyncio.wait_for(
                    self.data_queue.get(), 
                    timeout=1.0
                )
                
                # 处理数据
                processed_data = await self.process_single_item(data)
                
                if processed_data:
                    self.processed_count += 1
                    print(f"✨ 处理器{processor_id}完成: {data['id']} -> {processed_data['result']}")
                
            except asyncio.TimeoutError:
                if not self.running:
                    break
            except Exception as e:
                self.error_count += 1
                print(f"❌ 处理器{processor_id}错误: {e}")
    
    async def process_single_item(self, data):
        """处理单个数据项"""
        try:
            # 模拟数据处理逻辑
            await asyncio.sleep(0.2)  # 模拟处理时间
            
            # 简单的数据转换
            result = data['value'] * 2 + 1
            
            return {
                'original': data,
                'result': result,
                'processed_at': time.time()
            }
        except Exception as e:
            print(f"🔥 处理数据 {data['id']} 时出错: {e}")
            return None
    
    async def start_processing(self, num_processors=3):
        """启动实时数据处理"""
        self.running = True
        
        # 启动多个数据处理器
        processors = [
            asyncio.create_task(self.data_processor(i))
            for i in range(1, num_processors + 1)
        ]
        
        return processors
    
    def stop_processing(self):
        """停止处理"""
        self.running = False
    
    def get_stats(self):
        """获取处理统计"""
        return {
            "已处理": self.processed_count,
            "错误数": self.error_count,
            "队列大小": self.data_queue.qsize()
        }

# WebSocket服务器应用
class AsyncWebSocketServer:
    def __init__(self):
        self.clients = set()
        self.rooms = {}  # 房间系统
    
    async def register_client(self, websocket, path):
        """注册新客户端"""
        self.clients.add(websocket)
        print(f"👤 客户端连接: {websocket.remote_address}")
        
        try:
            await self.handle_client(websocket)
        except Exception as e:
            print(f"🚨 客户端处理错误: {e}")
        finally:
            self.clients.remove(websocket)
            print(f"👋 客户端断开: {websocket.remote_address}")
    
    async def handle_client(self, websocket):
        """处理客户端消息"""
        async for message in websocket:
            try:
                # 解析消息
                import json
                data = json.loads(message)
                message_type = data.get('type', 'message')
                
                if message_type == 'join_room':
                    await self.join_room(websocket, data['room'])
                elif message_type == 'leave_room':
                    await self.leave_room(websocket, data['room'])
                elif message_type == 'broadcast':
                    await self.broadcast_to_room(data['room'], data['message'])
                elif message_type == 'private':
                    await self.send_private_message(websocket, data)
                else:
                    await self.broadcast_to_all(data)
                    
            except json.JSONDecodeError:
                await websocket.send("错误: 无效的JSON格式")
            except Exception as e:
                await websocket.send(f"错误: {str(e)}")
    
    async def join_room(self, websocket, room_name):
        """加入房间"""
        if room_name not in self.rooms:
            self.rooms[room_name] = set()
        
        self.rooms[room_name].add(websocket)
        await websocket.send(f"已加入房间: {room_name}")
        print(f"🏠 客户端加入房间 {room_name}")
    
    async def leave_room(self, websocket, room_name):
        """离开房间"""
        if room_name in self.rooms:
            self.rooms[room_name].discard(websocket)
            if not self.rooms[room_name]:
                del self.rooms[room_name]
        
        await websocket.send(f"已离开房间: {room_name}")
    
    async def broadcast_to_room(self, room_name, message):
        """向房间广播消息"""
        if room_name in self.rooms:
            disconnected = set()
            for client in self.rooms[room_name].copy():
                try:
                    await client.send(f"[房间 {room_name}] {message}")
                except:
                    disconnected.add(client)
            
            # 清理断开的连接
            self.rooms[room_name] -= disconnected
    
    async def broadcast_to_all(self, data):
        """向所有客户端广播"""
        message = data.get('message', '空消息')
        disconnected = set()
        
        for client in self.clients.copy():
            try:
                await client.send(f"[广播] {message}")
            except:
                disconnected.add(client)
        
        # 清理断开的连接
        self.clients -= disconnected
    
    def get_server_stats(self):
        """获取服务器统计信息"""
        return {
            "在线客户端": len(self.clients),
            "活跃房间": len(self.rooms),
            "房间详情": {name: len(clients) for name, clients in self.rooms.items()}
        }

# API客户端应用
class AsyncAPIClient:
    def __init__(self, base_url, max_concurrent=10):
        self.base_url = base_url.rstrip('/')
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.session = None
        self.request_stats = {
            "total": 0,
            "success": 0,
            "error": 0,
            "response_times": []
        }
    
    async def __aenter__(self):
        self.session = aiohttp.ClientSession(
            timeout=aiohttp.ClientTimeout(total=30),
            headers={'User-Agent': 'AsyncAPIClient/1.0'}
        )
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()
    
    async def make_request(self, method, endpoint, **kwargs):
        """发送HTTP请求"""
        async with self.semaphore:
            url = f"{self.base_url}{endpoint}"
            start_time = time.time()
            
            try:
                self.request_stats["total"] += 1
                
                async with self.session.request(method, url, **kwargs) as response:
                    response_time = time.time() - start_time
                    self.request_stats["response_times"].append(response_time)
                    
                    if response.status < 400:
                        self.request_stats["success"] += 1
                        data = await response.json()
                        return {
                            "status": response.status,
                            "data": data,
                            "response_time": response_time
                        }
                    else:
                        self.request_stats["error"] += 1
                        return {
                            "status": response.status,
                            "error": f"HTTP {response.status}",
                            "response_time": response_time
                        }
                        
            except Exception as e:
                self.request_stats["error"] += 1
                response_time = time.time() - start_time
                return {
                    "status": 0,
                    "error": str(e),
                    "response_time": response_time
                }
    
    async def batch_requests(self, requests):
        """批量发送请求"""
        tasks = [
            self.make_request(req["method"], req["endpoint"], **req.get("kwargs", {}))
            for req in requests
        ]
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return results
    
    async def get(self, endpoint, **kwargs):
        return await self.make_request("GET", endpoint, **kwargs)
    
    async def post(self, endpoint, **kwargs):
        return await self.make_request("POST", endpoint, **kwargs)
    
    async def put(self, endpoint, **kwargs):
        return await self.make_request("PUT", endpoint, **kwargs)
    
    async def delete(self, endpoint, **kwargs):
        return await self.make_request("DELETE", endpoint, **kwargs)
    
    def get_stats(self):
        """获取请求统计信息"""
        if self.request_stats["response_times"]:
            avg_response_time = sum(self.request_stats["response_times"]) / len(self.request_stats["response_times"])
            max_response_time = max(self.request_stats["response_times"])
            min_response_time = min(self.request_stats["response_times"])
        else:
            avg_response_time = max_response_time = min_response_time = 0
        
        success_rate = (self.request_stats["success"] / self.request_stats["total"] * 100) if self.request_stats["total"] > 0 else 0
        
        return {
            "总请求数": self.request_stats["total"],
            "成功数": self.request_stats["success"],
            "失败数": self.request_stats["error"],
            "成功率": f"{success_rate:.1f}%",
            "平均响应时间": f"{avg_response_time:.3f}s",
            "最大响应时间": f"{max_response_time:.3f}s",
            "最小响应时间": f"{min_response_time:.3f}s"
        }

# 应用示例演示
async def application_demos():
    print("=== 异步应用场景演示 ===\n")
    
    # 1. Web爬虫演示
    print("🕷️ Web爬虫演示:")
    try:
        async with AsyncWebCrawler(max_concurrent=3, delay=0.5) as crawler:
            # 演示爬取少量页面
            start_urls = ["http://httpbin.org/"]
            crawled_data = await crawler.crawl_website(start_urls, max_pages=3)
            print(f"✅ 爬取完成,获得 {len(crawled_data)} 个页面")
    except Exception as e:
        print(f"⚠️ 爬虫演示跳过: {e}")
    
    # 2. 数据库连接池演示
    print(f"\n🗄️ 数据库连接池演示:")
    db_pool = AsyncDatabasePool("mock://database", pool_size=3)
    
    # 模拟多个并发数据库操作
    queries = [
        "SELECT * FROM users",
        "SELECT * FROM orders", 
        "SELECT * FROM products",
        "SELECT * FROM categories",
        "SELECT * FROM reviews"
    ]
    
    query_tasks = [db_pool.execute_query(query) for query in queries]
    results = await asyncio.gather(*query_tasks)
    
    print(f"📊 连接池状态: {db_pool.get_pool_stats()}")
    
    # 3. 实时数据处理演示
    print(f"\n⚡ 实时数据处理演示:")
    processor = AsyncRealTimeProcessor()
    
    # 启动处理器
    processing_tasks = await processor.start_processing(num_processors=2)
    
    # 启动数据生产者
    producer_tasks = [
        asyncio.create_task(processor.data_producer("sensor_A", 5)),
        asyncio.create_task(processor.data_producer("sensor_B", 3))
    ]
    
    # 等待生产者完成
    await asyncio.gather(*producer_tasks)
    
    # 等待一段时间让处理器处理完剩余数据
    await asyncio.sleep(2)
    
    # 停止处理器
    processor.stop_processing()
    await asyncio.gather(*processing_tasks)
    
    print(f"📈 处理统计: {processor.get_stats()}")
    
    # 4. API客户端演示
    print(f"\n🌐 API客户端演示:")
    try:
        async with AsyncAPIClient("https://httpbin.org") as client:
            # 发送多个并发请求
            test_requests = [
                {"method": "GET", "endpoint": "/get"},
                {"method": "POST", "endpoint": "/post", "kwargs": {"json": {"test": "data"}}},
                {"method": "GET", "endpoint": "/delay/1"},
            ]
            
            results = await client.batch_requests(test_requests)
            successful_requests = [r for r in results if not isinstance(r, Exception) and r.get("status", 0) < 400]
            
            print(f"✅ API请求完成: {len(successful_requests)}/{len(test_requests)} 成功")
            print(f"📊 客户端统计: {client.get_stats()}")
    except Exception as e:
        print(f"⚠️ API客户端演示跳过: {e}")

# 运行所有演示
asyncio.run(application_demos())

最佳实践

11.1 代码组织与架构

# 推荐的异步代码架构模式

# 1. 服务层架构
class AsyncServiceBase:
    """异步服务基类"""
    
    def __init__(self):
        self.logger = self.setup_logger()
        self.metrics = {"requests": 0, "errors": 0}
    
    def setup_logger(self):
        import logging
        logger = logging.getLogger(self.__class__.__name__)
        logger.setLevel(logging.INFO)
        return logger
    
    async def __aenter__(self):
        await self.startup()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.cleanup()
    
    async def startup(self):
        """服务启动时的初始化"""
        self.logger.info(f"启动 {self.__class__.__name__}")
    
    async def cleanup(self):
        """服务关闭时的清理"""
        self.logger.info(f"关闭 {self.__class__.__name__}")
    
    def record_metric(self, metric_name, value=1):
        """记录指标"""
        if metric_name not in self.metrics:
            self.metrics[metric_name] = 0
        self.metrics[metric_name] += value

# 2. 异步工厂模式
class AsyncResourceFactory:
    """异步资源工厂"""
    
    @staticmethod
    async def create_database_connection():
        """创建数据库连接"""
        await asyncio.sleep(0.1)  # 模拟连接建立
        return "database_connection"
    
    @staticmethod
    async def create_redis_connection():
        """创建Redis连接"""
        await asyncio.sleep(0.05)  # 模拟连接建立
        return "redis_connection"
    
    @staticmethod
    async def create_message_queue():
        """创建消息队列"""
        return asyncio.Queue()

# 3. 异步单例模式
class AsyncSingleton:
    """异步单例模式"""
    _instances = {}
    _locks = {}
    
    def __new__(cls, *args, **kwargs):
        if cls not in cls._instances:
            cls._locks[cls] = asyncio.Lock()
        return super().__new__(cls)
    
    async def __aenter__(self):
        async with self._locks[self.__class__]:
            if self.__class__ not in self._instances:
                await self._initialize()
                self._instances[self.__class__] = self
        return self._instances[self.__class__]
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        pass
    
    async def _initialize(self):
        """异步初始化"""
        await asyncio.sleep(0.1)  # 模拟初始化过程
        print(f"初始化 {self.__class__.__name__}")

# 4. 配置管理
class AsyncConfig:
    """异步配置管理"""
    
    def __init__(self):
        self._config = {}
        self._config_lock = asyncio.Lock()
    
    async def load_config(self, config_source):
        """异步加载配置"""
        async with self._config_lock:
            # 模拟从文件/网络加载配置
            await asyncio.sleep(0.1)
            self._config.update(config_source)
    
    async def get_config(self, key, default=None):
        """获取配置项"""
        async with self._config_lock:
            return self._config.get(key, default)
    
    async def set_config(self, key, value):
        """设置配置项"""
        async with self._config_lock:
            self._config[key] = value

# 5. 错误恢复与重试
class AsyncRetryHandler:
    """异步重试处理器"""
    
    @staticmethod
    def retry(max_attempts=3, delay=1, backoff=2, exceptions=(Exception,)):
        """重试装饰器"""
        def decorator(func):
            @functools.wraps(func)
            async def wrapper(*args, **kwargs):
                last_exception = None
                current_delay = delay
                
                for attempt in range(max_attempts):
                    try:
                        return await func(*args, **kwargs)
                    except exceptions as e:
                        last_exception = e
                        if attempt < max_attempts - 1:
                            print(f"⚠️ 第{attempt + 1}次尝试失败: {e}, {current_delay}秒后重试")
                            await asyncio.sleep(current_delay)
                            current_delay *= backoff
                        else:
                            print(f"❌ 所有{max_attempts}次尝试都失败")
                            raise last_exception
                
                raise last_exception
            return wrapper
        return decorator

# 6. 异步上下文管理器链
class AsyncContextManager:
    """异步上下文管理器链"""
    
    def __init__(self, *managers):
        self.managers = managers
        self.entered = []
    
    async def __aenter__(self):
        try:
            for manager in self.managers:
                result = await manager.__aenter__()
                self.entered.append((manager, result))
            return [result for _, result in self.entered]
        except Exception:
            # 如果任何一个失败,清理已进入的管理器
            await self._cleanup()
            raise
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self._cleanup()
    
    async def _cleanup(self):
        # 反向清理
        for manager, _ in reversed(self.entered):
            try:
                await manager.__aexit__(None, None, None)
            except Exception as e:
                print(f"清理时出错: {e}")
        self.entered.clear()

11.2 性能和内存管理

# 内存管理最佳实践
class AsyncMemoryManager:
    """异步内存管理"""
    
    @staticmethod
    async def process_large_dataset_efficiently(dataset):
        """高效处理大数据集"""
        chunk_size = 1000
        processed_count = 0
        
        for i in range(0, len(dataset), chunk_size):
            chunk = dataset[i:i + chunk_size]
            
            # 处理当前块
            await AsyncMemoryManager._process_chunk(chunk)
            processed_count += len(chunk)
            
            # 定期让出控制权和垃圾回收
            if processed_count % 10000 == 0:
                await asyncio.sleep(0)
                import gc
                gc.collect()  # 手动触发垃圾回收
        
        return processed_count
    
    @staticmethod
    async def _process_chunk(chunk):
        """处理数据块"""
        await asyncio.sleep(0.01)  # 模拟处理时间
        return [item * 2 for item in chunk]
    
    @staticmethod
    async def memory_efficient_file_processing(file_path):
        """内存高效的文件处理"""
        try:
            import aiofiles
            line_count = 0
            
            async with aiofiles.open(file_path, 'r') as file:
                async for line in file:
                    # 处理单行,而不是一次性加载整个文件
                    processed_line = await AsyncMemoryManager._process_line(line)
                    line_count += 1
                    
                    # 定期让出控制权
                    if line_count % 1000 == 0:
                        await asyncio.sleep(0)
            
            return line_count
        except ImportError:
            print("需要安装 aiofiles: pip install aiofiles")
            return 0
    
    @staticmethod
    async def _process_line(line):
        """处理单行"""
        return line.strip().upper()

# 资源清理管理
class AsyncResourceManager:
    """异步资源管理器"""
    
    def __init__(self):
        self.resources = []
        self.cleanup_tasks = []
    
    async def add_resource(self, resource, cleanup_func=None):
        """添加需要管理的资源"""
        self.resources.append((resource, cleanup_func))
    
    async def cleanup_all(self):
        """清理所有资源"""
        for resource, cleanup_func in reversed(self.resources):
            try:
                if cleanup_func:
                    if asyncio.iscoroutinefunction(cleanup_func):
                        await cleanup_func(resource)
                    else:
                        cleanup_func(resource)
                print(f"✅ 清理资源: {resource}")
            except Exception as e:
                print(f"❌ 清理资源失败: {resource}, 错误: {e}")
        
        self.resources.clear()
    
    async def __aenter__(self):
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.cleanup_all()

11.3 调试和测试

# 异步代码调试工具
class AsyncDebugger:
    """异步代码调试工具"""
    
    @staticmethod
    def trace_coroutine(name=None):
        """协程跟踪装饰器"""
        def decorator(func):
            @functools.wraps(func)
            async def wrapper(*args, **kwargs):
                func_name = name or f"{func.__module__}.{func.__name__}"
                print(f"🔍 开始执行: {func_name}")
                start_time = time.time()
                
                try:
                    result = await func(*args, **kwargs)
                    end_time = time.time()
                    print(f"✅ 完成执行: {func_name} (耗时: {end_time - start_time:.4f}s)")
                    return result
                except Exception as e:
                    end_time = time.time()
                    print(f"❌ 执行失败: {func_name} (耗时: {end_time - start_time:.4f}s), 错误: {e}")
                    raise
            
            return wrapper
        return decorator
    
    @staticmethod
    async def monitor_task_status():
        """监控任务状态"""
        while True:
            tasks = [task for task in asyncio.all_tasks() if not task.done()]
            print(f"📊 当前活跃任务数: {len(tasks)}")
            
            for i, task in enumerate(tasks[:5]):  # 只显示前5个
                print(f"  {i+1}. {task.get_name()}: {task}")
            
            await asyncio.sleep(5)

# 异步测试工具
class AsyncTester:
    """异步代码测试工具"""
    
    @staticmethod
    async def run_performance_test(coroutine_func, *args, iterations=100):
        """性能测试"""
        print(f"🧪 开始性能测试: {coroutine_func.__name__} ({iterations}次)")
        
        times = []
        errors = 0
        
        for i in range(iterations):
            start_time = time.time()
            try:
                await coroutine_func(*args)
                end_time = time.time()
                times.append(end_time - start_time)
            except Exception as e:
                errors += 1
                print(f"❌ 第{i+1}次测试出错: {e}")
        
        if times:
            avg_time = sum(times) / len(times)
            min_time = min(times)
            max_time = max(times)
            
            print(f"📈 性能测试结果:")
            print(f"  成功: {len(times)}/{iterations}")
            print(f"  失败: {errors}/{iterations}")
            print(f"  平均时间: {avg_time:.4f}s")
            print(f"  最短时间: {min_time:.4f}s")
            print(f"  最长时间: {max_time:.4f}s")
        else:
            print("❌ 所有测试都失败了")
    
    @staticmethod
    async def stress_test(coroutine_func, concurrent_count=100):
        """压力测试"""
        print(f"💪 开始压力测试: {concurrent_count}个并发任务")
        
        start_time = time.time()
        tasks = [asyncio.create_task(coroutine_func()) for _ in range(concurrent_count)]
        
        try:
            results = await asyncio.gather(*tasks, return_exceptions=True)
            end_time = time.time()
            
            successful = [r for r in results if not isinstance(r, Exception)]
            failed = len(results) - len(successful)
            
            print(f"🏁 压力测试完成:")
            print(f"  总耗时: {end_time - start_time:.2f}s")

引用链接

[1] 核心概念深入解析: #核心概念深入解析
[2] 关键语法详解: #关键语法详解
[3] 事件循环机制: #事件循环机制
[4] 协程生命周期: #协程生命周期
[5] 并发模式对比: #并发模式对比
[6] 常用编程模式: #常用编程模式
[7] 高级特性详解: #高级特性详解
[8] 异常处理机制: #异常处理机制
[9] 性能优化策略: #性能优化策略
[10] 实际应用场景: #实际应用场景
[11] 最佳实践: #最佳实践
[12] 生态系统与第三方库: #生态系统与第三方库

 

0
  1. 支付宝打赏

    qrcode alipay
  2. 微信打赏

    qrcode weixin

评论区