目 录CONTENT

文章目录

Python多线程实践指南,IO密集任务的好帮手

Administrator
2025-10-16 / 0 评论 / 0 点赞 / 0 阅读 / 0 字

 

字数 3202,阅读大约需 17 分钟

Python多线程实践指南,


什么是线程

在操作系统中,线程是进程内的一个执行单元,它是CPU调度和分派的基本单位。多线程编程是指在同一程序中,通过创建多个线程,使它们并行执行,以提高程序的运行效率。

线程是独立的执行流。这意味着程序可以同时执行多个任务。但在大多数Python 3实现中,不同的线程并不是真正同时执行的——它们只是看起来像是同时执行。

关键特点

  • 并发但不是并行:由于GIL(全局解释器锁)的存在,同一解释器时刻同一个解释器只有一个线程可以执行Python字节码

  • 适用场景:I/O密集型任务(文件操作、网络请求)

  • 不适用场景:CPU密集型任务(建议使用multiprocessing)

线程 vs 进程

特性

线程

进程

内存空间

共享同一进程的内存

独立的内存空间

创建开销

GIL影响

受限于GIL

不受GIL限制

适用场景

I/O密集型

CPU密集型


为什么仍需要多线程

多线程的价值体现在非 CPU 密集型场景中,尤其是I/O 密集型任务:

  1. 1. IO密集任务:网络请求、文件读写、数据库交互等,线程会主动释放GIL

  2. 2. 简单并发逻辑:程序监听用户的输入请求,处理数据等;

  3. 3. 禁止在CPU密集任务里使用多线程

创建和启动线程

方法1:传递函数


    
    
    
  import threading
import time
import logging

def thread_function(name):
    logging.info(f"Thread {name}: starting")
    time.sleep(2)
    logging.info(f"Thread {name}: finishing")

if __name__ == "__main__":
    # 配置日志
    logging.basicConfig(
        format="%(asctime)s: %(message)s",
        level=logging.INFO,
        datefmt="%H:%M:%S"
    )
    
    # 创建线程
    x = threading.Thread(target=thread_function, args=(1,))
    
    # 启动线程
    x.start()
    
    # 等待线程完成
    x.join()
    
    logging.info("Main: all done")

方法2:继承Thread类


    
    
    
  class MyThread(threading.Thread):
    def __init__(self, name):
        super().__init__()
        self.name = name
    
    def run(self):
        logging.info(f"Thread {self.name}: starting")
        time.sleep(2)
        logging.info(f"Thread {self.name}: finishing")

# 使用
thread = MyThread("Worker")
thread.start()
thread.join()

创建多个线程


    
    
    
  threads = []
for index in range(3):
    logging.info(f"Main: create and start thread {index}")
    x = threading.Thread(target=thread_function, args=(index,))
    threads.append(x)
    x.start()

# 等待所有线程完成
for index, thread in enumerate(threads):
    logging.info(f"Main: before joining thread {index}")
    thread.join()
    logging.info(f"Main: thread {index} done")

守护线程

守护线程是在后台运行的线程,当程序退出时会立即终止。

特点

  • • 主程序不会等待守护线程完成

  • • 适用于后台任务(如日志记录、心跳检测)

  • • 资源可能无法正常清理

创建守护线程


    
    
    
  # 方法1:构造函数设置
x = threading.Thread(target=thread_function, args=(1,), daemon=True)

# 方法2:属性设置
x = threading.Thread(target=thread_function, args=(1,))
x.daemon = True
x.start()

示例对比

非守护线程(程序等待线程完成):


    
    
    
  Main: before creating thread
Main: before running thread
Thread 1: starting
Main: wait for the thread to finish
Main: all done
Thread 1: finishing  # 在主程序后完成

守护线程(程序不等待):


    
    
    
  Main: before creating thread
Main: before running thread
Thread 1: starting
Main: wait for the thread to finish
Main: all done
# Thread 1 被强制终止,没有finishing消息

使用ThreadPoolExecutor

ThreadPoolExecutor提供了更简洁的方式来管理线程池。

基本用法


    
    
    
  import concurrent.futures

def thread_function(name):
    logging.info(f"Thread {name}: starting")
    time.sleep(2)
    logging.info(f"Thread {name}: finishing")

if __name__ == "__main__":
    # 创建包含3个工作线程的线程池
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        executor.map(thread_function, range(3))

提交单个任务


    
    
    
  with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    # 提交任务
    future = executor.submit(thread_function, "Worker-1")
    
    # 等待结果(可选)
    result = future.result()

处理返回值


    
    
    
  def calculate(n):
    return n * n

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    # 提交多个任务
    futures = [executor.submit(calculate, i) for i in range(5)]
    
    # 获取结果
    for future in concurrent.futures.as_completed(futures):
        result = future.result()
        print(f"Result: {result}")

竞态条件

竞态条件是多线程编程中最常见的问题之一,发生在多个线程同时访问和修改共享数据时。

问题示例


    
    
    
  class FakeDatabase:
    def __init__(self):
        self.value = 0
    
    def update(self, name):
        logging.info(f"Thread {name}: starting update")
        # 读取值
        local_copy = self.value
        # 修改值
        local_copy += 1
        # 模拟延迟
        time.sleep(0.1)
        # 写回值
        self.value = local_copy
        logging.info(f"Thread {name}: finishing update")

# 测试
database = FakeDatabase()
logging.info(f"Testing update. Starting value is {database.value}")

with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    for index in range(2):
        executor.submit(database.update, index)

logging.info(f"Testing update. Ending value is {database.value}")
# 期望值:2,实际值:1(竞态条件导致)

为什么会发生?

  1. 1. Thread 0 读取 value = 0,加1得到 local_copy = 1

  2. 2. Thread 0 休眠时,Thread 1 也读取 value = 0(尚未更新)

  3. 3. Thread 1 也计算得到 local_copy = 1

  4. 4. Thread 0 写回 value = 1

  5. 5. Thread 1 也写回 value = 1(覆盖了Thread 0的结果)


同步原语

Python的threading模块提供了多种同步原语来避免竞态条件。

1. Lock(互斥锁)

最基本的同步原语,确保同一时刻只有一个线程访问临界区。


    
    
    
  class FakeDatabase:
    def __init__(self):
        self.value = 0
        self._lock = threading.Lock()
    
    def locked_update(self, name):
        logging.info(f"Thread {name}: starting update")
        
        # 使用上下文管理器自动获取和释放锁
        with self._lock:
            logging.info(f"Thread {name} has lock")
            local_copy = self.value
            local_copy += 1
            time.sleep(0.1)
            self.value = local_copy
        
        logging.info(f"Thread {name}: finishing update")

# 使用
database = FakeDatabase()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    for index in range(2):
        executor.submit(database.locked_update, index)

# 现在结果正确:value = 2

2. RLock(可重入锁)

允许同一线程多次获取同一个锁。


    
    
    
  lock = threading.RLock()

def outer():
    with lock:
        print("Outer function acquired lock")
        inner()

def inner():
    with lock:  # 可以再次获取锁
        print("Inner function acquired lock")

outer()  # 正常工作,不会死锁

3. Semaphore(信号量)

限制同时访问资源的线程数量。


    
    
    
  # 限制同时只有2个线程可以访问数据库
semaphore = threading.Semaphore(2)

def access_database(name):
    with semaphore:
        logging.info(f"Thread {name}: accessing database")
        time.sleep(2)
        logging.info(f"Thread {name}: done")

# 创建5个线程,但同时只有2个能访问数据库
threads = []
for i in range(5):
    t = threading.Thread(target=access_database, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

4. Event(事件)

用于线程间的简单信号通信。


    
    
    
  event = threading.Event()

def wait_for_event(name):
    logging.info(f"Thread {name}: waiting for event")
    event.wait()  # 阻塞直到事件被设置
    logging.info(f"Thread {name}: event occurred")

def set_event():
    logging.info("Setting event in 2 seconds")
    time.sleep(2)
    event.set()  # 触发事件

# 启动等待线程
for i in range(3):
    t = threading.Thread(target=wait_for_event, args=(i,))
    t.start()

# 设置事件
set_event()

5. Condition(条件变量)

用于复杂的线程同步场景。


    
    
    
  condition = threading.Condition()
items = []

def consumer():
    with condition:
        while len(items) == 0:
            logging.info("Consumer waiting")
            condition.wait()  # 等待通知
        item = items.pop(0)
        logging.info(f"Consumer got: {item}")

def producer():
    with condition:
        item = "data"
        items.append(item)
        logging.info(f"Producer added: {item}")
        condition.notify()  # 通知等待的线程

# 先启动消费者
t1 = threading.Thread(target=consumer)
t1.start()

time.sleep(1)

# 然后生产者添加数据
t2 = threading.Thread(target=producer)
t2.start()

t1.join()
t2.join()

6. Barrier(栅栏)

让多个线程在某个点同步。


    
    
    
  barrier = threading.Barrier(3)  # 需要3个线程

def worker(name):
    logging.info(f"Thread {name}: doing work")
    time.sleep(random.random())
    
    logging.info(f"Thread {name}: waiting at barrier")
    barrier.wait()  # 等待其他线程
    
    logging.info(f"Thread {name}: passed barrier")

# 启动3个线程
for i in range(3):
    t = threading.Thread(target=worker, args=(i,))
    t.start()

生产者-消费者模式

使用Queue实现

queue.Queue是线程安全的FIFO队列,非常适合生产者-消费者场景。


    
    
    
  import queue
import random

def producer(pipeline, event):
    """生产者:生成数据放入队列"""
    while not event.is_set():
        message = random.randint(1, 101)
        logging.info(f"Producer got message: {message}")
        pipeline.put(message)
    
    logging.info("Producer received EXIT event. Exiting")

def consumer(pipeline, event):
    """消费者:从队列取数据处理"""
    while not event.is_set() or not pipeline.empty():
        message = pipeline.get()
        logging.info(f"Consumer storing message: {message} (queue size={pipeline.qsize()})")
    
    logging.info("Consumer received EXIT event. Exiting")

if __name__ == "__main__":
    # 创建队列(最多10个元素)
    pipeline = queue.Queue(maxsize=10)
    
    # 创建事件用于停止线程
    event = threading.Event()
    
    # 启动生产者和消费者
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline, event)
        executor.submit(consumer, pipeline, event)
        
        # 运行一段时间后停止
        time.sleep(0.1)
        logging.info("Main: about to set event")
        event.set()

输出示例


    
    
    
  Producer got message: 32
Producer got message: 51
Consumer storing message: 32 (queue size=0)
Producer got message: 94
Consumer storing message: 51 (queue size=1)
Main: about to set event
Producer received EXIT event. Exiting
Consumer storing message: 94 (queue size=0)
Consumer received EXIT event. Exiting

GIL和性能考量

什么是GIL?

全局解释器锁(GIL)是CPython中的一个机制,它确保同一时刻只有一个线程执行Python字节码。

GIL的影响


    
    
    
  # CPU密集型任务(受GIL限制)
def cpu_bound(n):
    return sum(i * i for i in range(n))

# I/O密集型任务(不受GIL限制)
def io_bound():
    time.sleep(1)  # 模拟I/O等待
    return "done"

性能对比

任务类型

单线程

多线程

多进程

CPU密集型

1x

~1x(无提升)

Nx(N=核心数)

I/O密集型

1x

Nx(显著提升)

Nx

Python 3.13的自由线程构建

从Python 3.13开始,可以通过编译选项禁用GIL,实现真正的并行执行:


    
    
    
  # 自由线程构建中
import sys
print(sys.flags.thread_inherit_context)  # 检查是否启用

最佳实践

1. 使用上下文管理器

推荐


    
    
    
  with lock:
    # 临界区代码
    shared_data += 1

不推荐


    
    
    
  lock.acquire()
try:
    shared_data += 1
finally:
    lock.release()

2. 使用超时


    
    
    
  # 避免无限等待
if lock.acquire(timeout=10):
    try:
        # 临界区代码
        pass
    finally:
        lock.release()
else:
    logging.error("Failed to acquire lock")

3. 避免死锁

死锁示例(不要这样做):


    
    
    
  lock = threading.Lock()
lock.acquire()
lock.acquire()  # 死锁!永远等待

解决方案:使用RLock或重新设计代码


    
    
    
  lock = threading.RLock()
lock.acquire()
lock.acquire()  # 正常工作
lock.release()
lock.release()

4. 合理使用守护线程


    
    
    
  # 后台任务使用守护线程
logging_thread = threading.Thread(target=log_worker, daemon=True)
logging_thread.start()

# 重要任务不要使用守护线程
data_thread = threading.Thread(target=save_data, daemon=False)
data_thread.start()
data_thread.join()  # 确保数据保存完成

5. 线程数量控制


    
    
    
  import os

# 根据CPU核心数确定线程数
# I/O密集型:CPU核心数 × 2-4
max_workers = os.cpu_count() * 2

with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
    executor.map(io_task, data_list)

6. 异常处理


    
    
    
  def safe_worker(name):
    try:
        # 工作代码
        risky_operation()
    except Exception as e:
        logging.error(f"Thread {name} error: {e}")
    finally:
        # 清理代码
        cleanup()

thread = threading.Thread(target=safe_worker, args=("Worker-1",))
thread.start()

7. 使用threading实用函数


    
    
    
  # 获取当前线程
current = threading.current_thread()
print(f"Current thread: {current.name}")

# 获取所有活动线程
for thread in threading.enumerate():
    print(f"Active thread: {thread.name}")

# 获取线程标识符
thread_id = threading.get_ident()
print(f"Thread ID: {thread_id}")

# 获取原生线程ID(操作系统级别)
native_id = threading.get_native_id()
print(f"Native Thread ID: {native_id}")

常用模式总结

模式1:简单并发任务


    
    
    
  def process_item(item):
    # 处理单个项目
    return result

items = [1, 2, 3, 4, 5]

with concurrent.futures.ThreadPoolExecutor() as executor:
    results = executor.map(process_item, items)
    
for result in results:
    print(result)

模式2:后台任务


    
    
    
  def background_task():
    while True:
        # 执行后台工作
        time.sleep(60)

daemon = threading.Thread(target=background_task, daemon=True)
daemon.start()

模式3:定时任务


    
    
    
  def delayed_task():
    print("Task executed after delay")

timer = threading.Timer(5.0, delayed_task)
timer.start()

# 可以取消定时器
# timer.cancel()

模式4:线程池批处理


    
    
    
  def batch_process(batch):
    results = []
    with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
        futures = [executor.submit(process_item, item) for item in batch]
        for future in concurrent.futures.as_completed(futures):
            results.append(future.result())
    return results

调试技巧

1. 启用详细日志


    
    
    
  logging.basicConfig(
    format="%(asctime)s - %(threadName)s - %(levelname)s - %(message)s",
    level=logging.DEBUG
)

2. 线程命名


    
    
    
  thread = threading.Thread(target=worker, name="Worker-1")
thread.start()

3. 检查线程状态


    
    
    
  if thread.is_alive():
    print("Thread is still running")
else:
    print("Thread has finished")

4. 使用ThreadPoolExecutor捕获异常


    
    
    
  def task_with_exception():
    raise ValueError("Something went wrong")

with concurrent.futures.ThreadPoolExecutor() as executor:
    future = executor.submit(task_with_exception)
    try:
        result = future.result()
    except ValueError as e:
        print(f"Caught exception: {e}")

何时使用线程?

适合使用线程的场景

I/O密集型任务

  • • 网络请求(API调用、爬虫)

  • • 文件读写

  • • 数据库操作

需要共享内存的并发

  • • 共享缓存

  • • 实时数据更新

响应式UI

  • • GUI应用中的后台任务

不适合使用线程的场景

CPU密集型任务

  • • 数值计算

  • • 图像/视频处理

  • • 机器学习训练
    → 改用multiprocessing

需要真正并行执行的任务
→ 考虑multiprocessingasyncio

简单的异步I/O
→ 考虑asyncio(更轻量)


替代方案

1. asyncio(异步I/O)

适合大量I/O操作,比线程更轻量:


    
    
    
  import asyncio

async def fetch_data(url):
    # 异步获取数据
    await asyncio.sleep(1)
    return f"Data from {url}"

async def main():
    tasks = [fetch_data(f"url-{i}") for i in range(10)]
    results = await asyncio.gather(*tasks)
    print(results)

asyncio.run(main())

2. multiprocessing(多进程)

适合CPU密集型任务:


    
    
    
  from multiprocessing import Pool

def cpu_intensive(n):
    return sum(i * i for i in range(n))

if __name__ == "__main__":
    with Pool(processes=4) as pool:
        results = pool.map(cpu_intensive, [10000000] * 4)

总结

关键要点

  1. 1. 线程适合I/O密集型任务,不适合CPU密集型任务

  2. 2. 使用ThreadPoolExecutor简化线程管理

  3. 3. 使用Lock、Semaphore等同步原语防止竞态条件

  4. 4. queue.Queue是线程安全的,适合生产者-消费者模式

  5. 5. GIL限制了CPU密集型任务的性能,但不影响I/O任务

  6. 6. 始终使用上下文管理器管理锁

  7. 7. 合理设置超时避免死锁

  8. 8. 守护线程适合后台任务,但资源可能无法正常清理

快速参考


    
    
    
  # 创建线程
t = threading.Thread(target=func, args=(arg,))
t.start()
t.join()

# 线程池
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    executor.map(func, data)

# 锁
with lock:
    # 临界区

# 队列
queue.put(item)
item = queue.get()

# 事件
event.set()
event.wait()

# 信号量
with semaphore:
    # 限制并发数

参考资源

  • Python官方文档 - threading模块[1]

  • Real Python - Threading教程[2]

  • PEP 703 - Making the GIL Optional[3]


编写日期:2024年
适用版本:Python 3.6+
特别说明:Python 3.13引入了自由线程构建,可禁用GIL实现真正的并行

引用链接

[1] Python官方文档 - threading模块: https://docs.python.org/3/library/threading.html
[2] Real Python - Threading教程: https://realpython.com/intro-to-python-threading/
[3] PEP 703 - Making the GIL Optional: https://peps.python.org/pep-0703/

 

0
  1. 支付宝打赏

    qrcode alipay
  2. 微信打赏

    qrcode weixin

评论区