字数 3202,阅读大约需 17 分钟
Python多线程实践指南,
什么是线程
在操作系统中,线程是进程内的一个执行单元,它是CPU调度和分派的基本单位。多线程编程是指在同一程序中,通过创建多个线程,使它们并行执行,以提高程序的运行效率。
线程是独立的执行流。这意味着程序可以同时执行多个任务。但在大多数Python 3实现中,不同的线程并不是真正同时执行的——它们只是看起来像是同时执行。
关键特点
• 并发但不是并行:由于GIL(全局解释器锁)的存在,同一解释器时刻同一个解释器只有一个线程可以执行Python字节码
• 适用场景:I/O密集型任务(文件操作、网络请求)
• 不适用场景:CPU密集型任务(建议使用multiprocessing)
线程 vs 进程
为什么仍需要多线程
多线程的价值体现在非 CPU 密集型场景中,尤其是I/O 密集型任务:
1. IO密集任务:网络请求、文件读写、数据库交互等,线程会主动释放GIL
2. 简单并发逻辑:程序监听用户的输入请求,处理数据等;
3. 禁止在CPU密集任务里使用多线程;
创建和启动线程
方法1:传递函数
import threading
import time
import logging
def thread_function(name):
logging.info(f"Thread {name}: starting")
time.sleep(2)
logging.info(f"Thread {name}: finishing")
if __name__ == "__main__":
# 配置日志
logging.basicConfig(
format="%(asctime)s: %(message)s",
level=logging.INFO,
datefmt="%H:%M:%S"
)
# 创建线程
x = threading.Thread(target=thread_function, args=(1,))
# 启动线程
x.start()
# 等待线程完成
x.join()
logging.info("Main: all done")
方法2:继承Thread类
class MyThread(threading.Thread):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
logging.info(f"Thread {self.name}: starting")
time.sleep(2)
logging.info(f"Thread {self.name}: finishing")
# 使用
thread = MyThread("Worker")
thread.start()
thread.join()
创建多个线程
threads = []
for index in range(3):
logging.info(f"Main: create and start thread {index}")
x = threading.Thread(target=thread_function, args=(index,))
threads.append(x)
x.start()
# 等待所有线程完成
for index, thread in enumerate(threads):
logging.info(f"Main: before joining thread {index}")
thread.join()
logging.info(f"Main: thread {index} done")
守护线程
守护线程是在后台运行的线程,当程序退出时会立即终止。
特点
• 主程序不会等待守护线程完成
• 适用于后台任务(如日志记录、心跳检测)
• 资源可能无法正常清理
创建守护线程
# 方法1:构造函数设置
x = threading.Thread(target=thread_function, args=(1,), daemon=True)
# 方法2:属性设置
x = threading.Thread(target=thread_function, args=(1,))
x.daemon = True
x.start()
示例对比
非守护线程(程序等待线程完成):
Main: before creating thread
Main: before running thread
Thread 1: starting
Main: wait for the thread to finish
Main: all done
Thread 1: finishing # 在主程序后完成
守护线程(程序不等待):
Main: before creating thread
Main: before running thread
Thread 1: starting
Main: wait for the thread to finish
Main: all done
# Thread 1 被强制终止,没有finishing消息
使用ThreadPoolExecutor
ThreadPoolExecutor
提供了更简洁的方式来管理线程池。
基本用法
import concurrent.futures
def thread_function(name):
logging.info(f"Thread {name}: starting")
time.sleep(2)
logging.info(f"Thread {name}: finishing")
if __name__ == "__main__":
# 创建包含3个工作线程的线程池
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
executor.map(thread_function, range(3))
提交单个任务
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
# 提交任务
future = executor.submit(thread_function, "Worker-1")
# 等待结果(可选)
result = future.result()
处理返回值
def calculate(n):
return n * n
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# 提交多个任务
futures = [executor.submit(calculate, i) for i in range(5)]
# 获取结果
for future in concurrent.futures.as_completed(futures):
result = future.result()
print(f"Result: {result}")
竞态条件
竞态条件是多线程编程中最常见的问题之一,发生在多个线程同时访问和修改共享数据时。
问题示例
class FakeDatabase:
def __init__(self):
self.value = 0
def update(self, name):
logging.info(f"Thread {name}: starting update")
# 读取值
local_copy = self.value
# 修改值
local_copy += 1
# 模拟延迟
time.sleep(0.1)
# 写回值
self.value = local_copy
logging.info(f"Thread {name}: finishing update")
# 测试
database = FakeDatabase()
logging.info(f"Testing update. Starting value is {database.value}")
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
for index in range(2):
executor.submit(database.update, index)
logging.info(f"Testing update. Ending value is {database.value}")
# 期望值:2,实际值:1(竞态条件导致)
为什么会发生?
1. Thread 0 读取
value = 0
,加1得到local_copy = 1
2. Thread 0 休眠时,Thread 1 也读取
value = 0
(尚未更新)3. Thread 1 也计算得到
local_copy = 1
4. Thread 0 写回
value = 1
5. Thread 1 也写回
value = 1
(覆盖了Thread 0的结果)
同步原语
Python的threading
模块提供了多种同步原语来避免竞态条件。
1. Lock(互斥锁)
最基本的同步原语,确保同一时刻只有一个线程访问临界区。
class FakeDatabase:
def __init__(self):
self.value = 0
self._lock = threading.Lock()
def locked_update(self, name):
logging.info(f"Thread {name}: starting update")
# 使用上下文管理器自动获取和释放锁
with self._lock:
logging.info(f"Thread {name} has lock")
local_copy = self.value
local_copy += 1
time.sleep(0.1)
self.value = local_copy
logging.info(f"Thread {name}: finishing update")
# 使用
database = FakeDatabase()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
for index in range(2):
executor.submit(database.locked_update, index)
# 现在结果正确:value = 2
2. RLock(可重入锁)
允许同一线程多次获取同一个锁。
lock = threading.RLock()
def outer():
with lock:
print("Outer function acquired lock")
inner()
def inner():
with lock: # 可以再次获取锁
print("Inner function acquired lock")
outer() # 正常工作,不会死锁
3. Semaphore(信号量)
限制同时访问资源的线程数量。
# 限制同时只有2个线程可以访问数据库
semaphore = threading.Semaphore(2)
def access_database(name):
with semaphore:
logging.info(f"Thread {name}: accessing database")
time.sleep(2)
logging.info(f"Thread {name}: done")
# 创建5个线程,但同时只有2个能访问数据库
threads = []
for i in range(5):
t = threading.Thread(target=access_database, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
4. Event(事件)
用于线程间的简单信号通信。
event = threading.Event()
def wait_for_event(name):
logging.info(f"Thread {name}: waiting for event")
event.wait() # 阻塞直到事件被设置
logging.info(f"Thread {name}: event occurred")
def set_event():
logging.info("Setting event in 2 seconds")
time.sleep(2)
event.set() # 触发事件
# 启动等待线程
for i in range(3):
t = threading.Thread(target=wait_for_event, args=(i,))
t.start()
# 设置事件
set_event()
5. Condition(条件变量)
用于复杂的线程同步场景。
condition = threading.Condition()
items = []
def consumer():
with condition:
while len(items) == 0:
logging.info("Consumer waiting")
condition.wait() # 等待通知
item = items.pop(0)
logging.info(f"Consumer got: {item}")
def producer():
with condition:
item = "data"
items.append(item)
logging.info(f"Producer added: {item}")
condition.notify() # 通知等待的线程
# 先启动消费者
t1 = threading.Thread(target=consumer)
t1.start()
time.sleep(1)
# 然后生产者添加数据
t2 = threading.Thread(target=producer)
t2.start()
t1.join()
t2.join()
6. Barrier(栅栏)
让多个线程在某个点同步。
barrier = threading.Barrier(3) # 需要3个线程
def worker(name):
logging.info(f"Thread {name}: doing work")
time.sleep(random.random())
logging.info(f"Thread {name}: waiting at barrier")
barrier.wait() # 等待其他线程
logging.info(f"Thread {name}: passed barrier")
# 启动3个线程
for i in range(3):
t = threading.Thread(target=worker, args=(i,))
t.start()
生产者-消费者模式
使用Queue实现
queue.Queue
是线程安全的FIFO队列,非常适合生产者-消费者场景。
import queue
import random
def producer(pipeline, event):
"""生产者:生成数据放入队列"""
while not event.is_set():
message = random.randint(1, 101)
logging.info(f"Producer got message: {message}")
pipeline.put(message)
logging.info("Producer received EXIT event. Exiting")
def consumer(pipeline, event):
"""消费者:从队列取数据处理"""
while not event.is_set() or not pipeline.empty():
message = pipeline.get()
logging.info(f"Consumer storing message: {message} (queue size={pipeline.qsize()})")
logging.info("Consumer received EXIT event. Exiting")
if __name__ == "__main__":
# 创建队列(最多10个元素)
pipeline = queue.Queue(maxsize=10)
# 创建事件用于停止线程
event = threading.Event()
# 启动生产者和消费者
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(producer, pipeline, event)
executor.submit(consumer, pipeline, event)
# 运行一段时间后停止
time.sleep(0.1)
logging.info("Main: about to set event")
event.set()
输出示例
Producer got message: 32
Producer got message: 51
Consumer storing message: 32 (queue size=0)
Producer got message: 94
Consumer storing message: 51 (queue size=1)
Main: about to set event
Producer received EXIT event. Exiting
Consumer storing message: 94 (queue size=0)
Consumer received EXIT event. Exiting
GIL和性能考量
什么是GIL?
全局解释器锁(GIL)是CPython中的一个机制,它确保同一时刻只有一个线程执行Python字节码。
GIL的影响
# CPU密集型任务(受GIL限制)
def cpu_bound(n):
return sum(i * i for i in range(n))
# I/O密集型任务(不受GIL限制)
def io_bound():
time.sleep(1) # 模拟I/O等待
return "done"
性能对比
Python 3.13的自由线程构建
从Python 3.13开始,可以通过编译选项禁用GIL,实现真正的并行执行:
# 自由线程构建中
import sys
print(sys.flags.thread_inherit_context) # 检查是否启用
最佳实践
1. 使用上下文管理器
推荐:
with lock:
# 临界区代码
shared_data += 1
不推荐:
lock.acquire()
try:
shared_data += 1
finally:
lock.release()
2. 使用超时
# 避免无限等待
if lock.acquire(timeout=10):
try:
# 临界区代码
pass
finally:
lock.release()
else:
logging.error("Failed to acquire lock")
3. 避免死锁
死锁示例(不要这样做):
lock = threading.Lock()
lock.acquire()
lock.acquire() # 死锁!永远等待
解决方案:使用RLock或重新设计代码
lock = threading.RLock()
lock.acquire()
lock.acquire() # 正常工作
lock.release()
lock.release()
4. 合理使用守护线程
# 后台任务使用守护线程
logging_thread = threading.Thread(target=log_worker, daemon=True)
logging_thread.start()
# 重要任务不要使用守护线程
data_thread = threading.Thread(target=save_data, daemon=False)
data_thread.start()
data_thread.join() # 确保数据保存完成
5. 线程数量控制
import os
# 根据CPU核心数确定线程数
# I/O密集型:CPU核心数 × 2-4
max_workers = os.cpu_count() * 2
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
executor.map(io_task, data_list)
6. 异常处理
def safe_worker(name):
try:
# 工作代码
risky_operation()
except Exception as e:
logging.error(f"Thread {name} error: {e}")
finally:
# 清理代码
cleanup()
thread = threading.Thread(target=safe_worker, args=("Worker-1",))
thread.start()
7. 使用threading实用函数
# 获取当前线程
current = threading.current_thread()
print(f"Current thread: {current.name}")
# 获取所有活动线程
for thread in threading.enumerate():
print(f"Active thread: {thread.name}")
# 获取线程标识符
thread_id = threading.get_ident()
print(f"Thread ID: {thread_id}")
# 获取原生线程ID(操作系统级别)
native_id = threading.get_native_id()
print(f"Native Thread ID: {native_id}")
常用模式总结
模式1:简单并发任务
def process_item(item):
# 处理单个项目
return result
items = [1, 2, 3, 4, 5]
with concurrent.futures.ThreadPoolExecutor() as executor:
results = executor.map(process_item, items)
for result in results:
print(result)
模式2:后台任务
def background_task():
while True:
# 执行后台工作
time.sleep(60)
daemon = threading.Thread(target=background_task, daemon=True)
daemon.start()
模式3:定时任务
def delayed_task():
print("Task executed after delay")
timer = threading.Timer(5.0, delayed_task)
timer.start()
# 可以取消定时器
# timer.cancel()
模式4:线程池批处理
def batch_process(batch):
results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(process_item, item) for item in batch]
for future in concurrent.futures.as_completed(futures):
results.append(future.result())
return results
调试技巧
1. 启用详细日志
logging.basicConfig(
format="%(asctime)s - %(threadName)s - %(levelname)s - %(message)s",
level=logging.DEBUG
)
2. 线程命名
thread = threading.Thread(target=worker, name="Worker-1")
thread.start()
3. 检查线程状态
if thread.is_alive():
print("Thread is still running")
else:
print("Thread has finished")
4. 使用ThreadPoolExecutor捕获异常
def task_with_exception():
raise ValueError("Something went wrong")
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(task_with_exception)
try:
result = future.result()
except ValueError as e:
print(f"Caught exception: {e}")
何时使用线程?
适合使用线程的场景
✅ I/O密集型任务
• 网络请求(API调用、爬虫)
• 文件读写
• 数据库操作
✅ 需要共享内存的并发
• 共享缓存
• 实时数据更新
✅ 响应式UI
• GUI应用中的后台任务
不适合使用线程的场景
❌ CPU密集型任务
• 数值计算
• 图像/视频处理
• 机器学习训练
→ 改用multiprocessing
❌ 需要真正并行执行的任务
→ 考虑multiprocessing
或asyncio
❌ 简单的异步I/O
→ 考虑asyncio
(更轻量)
替代方案
1. asyncio(异步I/O)
适合大量I/O操作,比线程更轻量:
import asyncio
async def fetch_data(url):
# 异步获取数据
await asyncio.sleep(1)
return f"Data from {url}"
async def main():
tasks = [fetch_data(f"url-{i}") for i in range(10)]
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())
2. multiprocessing(多进程)
适合CPU密集型任务:
from multiprocessing import Pool
def cpu_intensive(n):
return sum(i * i for i in range(n))
if __name__ == "__main__":
with Pool(processes=4) as pool:
results = pool.map(cpu_intensive, [10000000] * 4)
总结
关键要点
1. 线程适合I/O密集型任务,不适合CPU密集型任务
2. 使用ThreadPoolExecutor简化线程管理
3. 使用Lock、Semaphore等同步原语防止竞态条件
4. queue.Queue是线程安全的,适合生产者-消费者模式
5. GIL限制了CPU密集型任务的性能,但不影响I/O任务
6. 始终使用上下文管理器管理锁
7. 合理设置超时避免死锁
8. 守护线程适合后台任务,但资源可能无法正常清理
快速参考
# 创建线程
t = threading.Thread(target=func, args=(arg,))
t.start()
t.join()
# 线程池
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
executor.map(func, data)
# 锁
with lock:
# 临界区
# 队列
queue.put(item)
item = queue.get()
# 事件
event.set()
event.wait()
# 信号量
with semaphore:
# 限制并发数
参考资源
• Python官方文档 - threading模块[1]
• Real Python - Threading教程[2]
• PEP 703 - Making the GIL Optional[3]
编写日期:2024年
适用版本:Python 3.6+
特别说明:Python 3.13引入了自由线程构建,可禁用GIL实现真正的并行
引用链接
[1]
Python官方文档 - threading模块: https://docs.python.org/3/library/threading.html[2]
Real Python - Threading教程: https://realpython.com/intro-to-python-threading/[3]
PEP 703 - Making the GIL Optional: https://peps.python.org/pep-0703/
评论区