字数 4840,阅读大约需 25 分钟
Python并行编程:multiprocessing
什么是multiprocessing
绕过GIL全局解释器锁的限制,使用子进程并发执行,充分利用现代多处理器;
主要特点和限制
Availability: not Android, not iOS, not WASI. 此模块在 移动平台 或 WebAssembly 平台 上不受支持。
概述
multiprocessing 是一个支持使用与 threading 模块类似的 API 来产生进程的包。 multiprocessing 包同时提供了本地和远程并发操作,通过使用子进程而非线程有效地绕过了 全局解释器锁。 因此,multiprocessing 模块允许程序员充分利用给定机器上的多个处理器。 它在 POSIX 和 Windows 上均可运行。
multiprocessing 模块还引入了在 threading 模块中没有的API。一个主要的例子就是 Pool 对象,它提供了一种快捷的方法,赋予函数并行化处理一系列输入值的能力,可以将输入数据分配给不同进程处理(数据并行)。
核心资料
multiprocessing --- 基于进程的并行 — Python 3.14.0 文档[1]
底层操作系统控制着新进程的创建方式。在某些系统上,这可能需要生成一个新进程,而在另一些系统上,则可能需要对进程进行分叉。Python中用于创建新进程的特定于操作系统的方法无需我们担心,因为它由你安装的Python解释器进行管理
论坛:Multiprocessing in Python: The Complete Guide : r/programming[2]
Python Multiprocessing: The Complete Guide - Super Fast Python[3]
完整学习多处理器,请务必阅读上面的文章!!!
Python进程
每个Python程序都是一个进程,并且有一个名为主线程的默认线程,用于执行程序指令。实际上,每个进程都是执行Python指令(Python字节码)的Python解释器的一个实例,其级别比你在Python程序中输入的代码稍低。2组基本概念:
1. Concurrent: Code that can be executed out of order.并发:可以无序执行的代码。
2. Parallel: Capability to execute code simultaneously.并行:同时执行代码的能力。
进程和线程
线程始终存在于进程中,它代表了指令或代码的执行方式。一个进程至少会有一个线程,称为主线程。我们在该进程内创建的任何额外线程都将属于该进程。Python解释器的实例至少有一个名为主线程的线程。Python进程中的执行线程,例如主线程或新线程。
# a target function that does something
def work()
# do something...
# create a thread to execute the work() function
thread = Thread(target=work)
# start the thread
thread.start()
Thread vs Process in Python - Super Fast Python[4]
父进程和子进程
创建子进程的进程称为父进程。一个子进程永远只有一个父进程。子进程也可称为 subprocess,因为它是由另一个进程创建的,而非由操作系统直接创建。话虽如此,所有进程的创建和管理都是由底层操作系统处理的。
创建子进程的3种方式:fork、spawn和fork server。multiprocessing --- 基于进程的并行 — Python 3.14.0 文档[1]
Run Code 示例演示
参数可以指定为元组并传递给multiprocessing.Process类构造函数的“args”参数,或者作为字典传递给“kwargs”参数。
process = multiprocessing.Process(target=task, args=(arg1, arg2))
process.start()
将创建一个新的Python解释器实例,并在新进程中创建一个新线程来执行我们的目标函数。
限制🚫:我们无法精确控制进程何时执行,也无法控制哪个CPU核心会执行该进程。这两项都是由底层操作系统负责处理的低级任务。
# SuperFastPython.com
# example of running a function in another process
from time import sleep
from multiprocessing import Process
# a custom function that blocks for a moment
def task():
# block for a moment
sleep(1)
# display a message
print('This is from another process')
# entry point
if __name__ == '__main__':
# create a process
process = Process(target=task)
# run the process
process.start()
# wait for the process to finish
print('Waiting for the process...')
process.join()
更多完整学习路线
1. Python Processes(Python进程)
• What Are Processes(什么是进程)
• Thread vs Process(线程与进程)
• Life-Cycle of a Process(进程的生命周期)
• Child vs Parent Process(子进程与父进程)
2. Run a Function in a Process(在进程中运行函数)
• How to Run a Function In a Process(如何在进程中运行函数)
• Example of Running a Function in a Process(示例)
• Example of Running a Function in a Process With Arguments(带参数的示例)
3. Extend the Process Class(扩展Process类)
• How to Extend the Process Class(如何扩展Process类)
• Example of Extending the Process Class(示例)
• Example of Extending the Process Class and Returning Values(返回值示例)
4. Process Start Methods(进程启动方法)
• What is a Start Method(什么是启动方法)
• How to Change The Start Method(如何更改启动方法)
• How to Set Start Method Via Context(通过上下文设置启动方法)
5. Process Instance Attributes(进程实例属性)
• Query Process Name(查询进程名称)
• Query Process Daemon(查询守护进程)
• Query Process PID(查询进程PID)
• Query Process Alive(查询进程存活状态)
• Query Process Exit Code(查询进程退出码)
6. Configure Processes(配置进程)
• How to Configure Process Name(如何配置进程名称)
• How to Configure a Daemon Process(如何配置守护进程)
7. Main Process(主进程)
• What is the Main Process(什么是主进程)
• How Can the Main Process Be Identified(如何识别主进程)
• How to Get the Main Process(如何获取主进程)
• What is the Name of the Main Process(主进程的名称)
8. Process Utilities(进程工具)
• Active Child Processes(活动子进程)
• Get The Number of CPU Cores(获取CPU核心数)
• The Current Process(当前进程)
• The Parent Process(父进程)
9. Process Mutex Lock(进程互斥锁)
• What is a Mutual Exclusion Lock(什么是互斥锁)
• How to Use a Mutex Lock(如何使用互斥锁)
• Example of Using a Mutex Lock(示例)
10. Process Reentrant Lock(进程可重入锁)
• What is a Reentrant Lock(什么是可重入锁)
• How to Use the Reentrant Lock(如何使用可重入锁)
• Example of Using a Reentrant Lock(示例)
11. Process Condition Variable(进程条件变量)
• What is a Process Condition Variable(什么是条件变量)
• How to Use a Condition Variable(如何使用条件变量)
• Example of Wait and Notify With a Condition Variable(等待和通知示例)
12. Process Semaphore(进程信号量)
• What is a Semaphore(什么是信号量)
• How to Use a Semaphore(如何使用信号量)
• Example of Using a Semaphore(示例)
13. Process Event(进程事件)
• How to Use an Event Object(如何使用事件对象)
• Example of Using an Event Object(示例)
14. Process Barrier(进程屏障)
• What is a Barrier(什么是屏障)
• How to Use a Barrier(如何使用屏障)
• Example of Using a Process Barrier(示例)
15. Python Multiprocessing Best Practices(最佳实践)
• Tip 1: Use Context Managers(使用上下文管理器)
• Tip 2: Use Timeouts When Waiting(等待时使用超时)
• Tip 3: Use Main Module Idiom(使用主模块习惯用法)
• Tip 4: Use Shared ctypes(使用共享ctypes)
• Tip 5: Use Pipes and Queues(使用管道和队列)
16. Python Multiprocessing Common Errors(常见错误)
• Error 1: RuntimeError Starting New Processes(启动新进程的运行时错误)
• Error 2: print() Does Not Work In Child Processes(print()在子进程中不工作)
• Error 3: Adding Attributes to Classes that Extend Process(向扩展Process的类添加属性)
17. Python Multiprocessing Common Questions(常见问题)
• How to Safely Stop a Process?(如何安全停止进程)
• How to Kill a Process?(如何杀死进程)
教程代码
# -*- coding: utf-8 -*-
"""
Python multiprocessing 模块使用教程
multiprocessing 是 Python 中用于实现多进程编程的模块,它允许程序创建并管理多个独立的进程。
每个进程都有自己独立的内存空间,因此它们之间的数据共享需要特定的机制。
multiprocessing 模块提供了类似于 threading 模块的 API,但使用的是进程而不是线程,
这使得它能够充分利用多核 CPU 的优势,避免 GIL(全局解释器锁)的限制。
主要特点:
1. 进程独立:每个进程有独立的内存空间,避免了线程间共享数据可能导致的复杂性。
2. 利用多核:可以充分利用多核 CPU,实现真正的并行计算。
3. 丰富的工具:提供了 Process、Pool、Queue、Pipe、Lock、Event、Semaphore 等多种工具,
用于进程的创建、管理、通信和同步。
本教程将详细介绍 multiprocessing 模块的常用功能和使用方法。
"""
import os
import time
import random
from multiprocessing import Process, Queue, Pipe, Lock, Event, Semaphore, Value, Array, Pool, Manager
# --- 1. Process 类的使用 ---
# Process 类是 multiprocessing 模块中最基本的组件,用于创建新的进程。
def worker_function(name):
"""
一个简单的进程工作函数,打印进程信息和执行任务。
:param name: 进程名称
"""
print(f"子进程 {name} 启动,PID: {os.getpid()}")
time.sleep(random.uniform(0.5, 2)) # 模拟耗时操作
print(f"子进程 {name} 结束,PID: {os.getpid()}")
def process_example():
"""
演示 Process 类的基本使用。
"""
print("\n--- Process 类使用示例 ---")
print(f"主进程启动,PID: {os.getpid()}")
# 创建一个 Process 对象
# target 参数指定进程要执行的函数
# args 参数以元组形式传递给 target 函数的参数
p1 = Process(target=worker_function, args=("Worker-1",))
p2 = Process(target=worker_function, args=("Worker-2",))
# 启动子进程
p1.start()
p2.start()
# 等待子进程结束
# join() 方法会阻塞主进程,直到对应的子进程执行完毕。
# 注意:p1.join() 和 p2.join() 是主进程等待子进程的独立操作,
# 它们不意味着 p2 必须等待 p1 完成。子进程 p1 和 p2 是并行执行的。
p1.join()
p2.join()
print("所有子进程已结束,主进程继续执行。")
# --- 2. Pool 类的使用 ---
# Pool 类提供了一种方便的方式来管理一组工作进程,可以并行地执行任务。
# 它会自动管理进程的创建和销毁,并提供 map、apply、apply_async 等方法。
def cube(x):
"""
计算一个数的立方。
:param x: 输入数字
:return: x 的立方
"""
print(f"进程 {os.getpid()} 计算 {x}^3")
time.sleep(0.1) # 模拟耗时
return x * x * x
def pool_example():
"""
演示 Pool 类的基本使用。
"""
print("\n--- Pool 类使用示例 ---")
# 创建一个包含 4 个工作进程的进程池
# 默认情况下,进程池的大小是 os.cpu_count()
with Pool(processes=4) as pool:
# 使用 map 方法并行处理可迭代对象中的每个元素
# map 会阻塞直到所有结果都返回,并且结果的顺序与输入顺序一致
results = pool.map(cube, range(1, 6))
print(f"map 方法结果: {results}")
# 使用 apply_async 方法异步提交任务
# apply_async 不会阻塞,会立即返回一个 AsyncResult 对象
# 可以通过 AsyncResult.get() 获取结果,get() 会阻塞直到结果可用
async_results = [pool.apply_async(cube, (i,)) for i in range(6, 11)]
print("apply_async 任务已提交,等待结果...")
for res in async_results:
print(f"apply_async 方法结果: {res.get()}")
print("Pool 示例结束。")
# --- 3. 进程间通信 (IPC) ---
# 进程之间由于内存独立,需要特定的机制进行数据交换。multiprocessing 提供了 Queue 和 Pipe。
# 3.1 Queue (队列)
# Queue 是一个多进程安全的队列,可以用于在多个进程之间传递数据。
def producer(q, data):
"""
生产者进程,向队列中放入数据。
:param q: 队列对象
:param data: 要放入队列的数据
"""
print(f"生产者 {os.getpid()} 启动,准备放入数据: {data}")
for item in data:
q.put(item)
print(f"生产者 {os.getpid()} 放入: {item}")
time.sleep(random.uniform(0.1, 0.5))
q.put(None) # 发送结束信号
print(f"生产者 {os.getpid()} 结束。")
def consumer(q):
"""
消费者进程,从队列中取出数据。
:param q: 队列对象
"""
print(f"消费者 {os.getpid()} 启动,准备取出数据。")
while True:
item = q.get()
if item is None: # 收到结束信号
break
print(f"消费者 {os.getpid()} 取出: {item}")
time.sleep(random.uniform(0.1, 0.5))
print(f"消费者 {os.getpid()} 结束。")
def queue_example():
"""
演示 Queue 的使用。
"""
print("\n--- Queue 进程间通信示例 ---")
q = Queue() # 创建一个队列
data_to_produce = ["apple", "banana", "cherry", "date", "elderberry"]
p_producer = Process(target=producer, args=(q, data_to_produce))
p_consumer = Process(target=consumer, args=(q,))
p_producer.start()
p_consumer.start()
p_producer.join()
p_consumer.join()
print("Queue 示例结束。")
# 3.2 Pipe (管道)
# Pipe 用于在两个进程之间创建双向通信通道。它返回一对连接对象 (conn1, conn2)。
# 每个连接对象都有 send() 和 recv() 方法。
def sender(conn, messages):
"""
发送方进程,通过管道发送消息。
:param conn: 管道连接对象
:param messages: 要发送的消息列表
"""
print(f"发送方 {os.getpid()} 启动,准备发送消息。")
for msg in messages:
conn.send(msg)
print(f"发送方 {os.getpid()} 发送: {msg}")
time.sleep(random.uniform(0.1, 0.5))
time.sleep(0.1) # 确保接收方有时间处理最后一条消息
conn.close() # 关闭连接
print(f"发送方 {os.getpid()} 结束。")
def receiver(conn):
"""
接收方进程,通过管道接收消息。
:param conn: 管道连接对象
"""
print(f"接收方 {os.getpid()} 启动,准备接收消息。")
while True:
try:
msg = conn.recv()
print(f"接收方 {os.getpid()} 接收: {msg}")
time.sleep(random.uniform(0.1, 0.5)) # 模拟处理时间
except EOFError: # 当发送方关闭连接时,recv() 会抛出 EOFError
break # 收到结束信号,立即退出循环
except Exception as e:
print(f"接收方 {os.getpid()} 发生错误: {e}")
break
conn.close() # 关闭连接
print(f"接收方 {os.getpid()} 结束。")
def pipe_example():
"""
演示 Pipe 的使用。
"""
print("\n--- Pipe 进程间通信示例 ---")
parent_conn, child_conn = Pipe() # 创建一个管道
messages_to_send = ["Hello", "from", "parent", "process", "via", "Pipe!"]
p_sender = Process(target=sender, args=(parent_conn, messages_to_send))
p_receiver = Process(target=receiver, args=(child_conn,))
p_sender.start()
p_receiver.start()
p_sender.join()
p_receiver.join()
print("Pipe 示例结束。")
# --- 4. 进程同步 ---
# 当多个进程需要访问共享资源时,为了避免数据竞争和不一致性,需要使用同步机制。
# 4.1 Lock (锁)
# Lock 用于控制对共享资源的访问,确保在任何给定时间只有一个进程可以访问该资源。
def increment_with_lock(counter, lock):
"""
使用锁保护共享计数器递增。
:param counter: 共享计数器 (multiprocessing.Value)
:param lock: 锁对象
"""
for _ in range(100000):
lock.acquire() # 获取锁
try:
counter.value += 1
finally:
lock.release() # 释放锁
def lock_example():
"""
演示 Lock 的使用。
"""
print("\n--- Lock 进程同步示例 ---")
# Value 用于在进程间共享一个可变的值
counter = Value('i', 0) # 'i' 表示整数类型,0 是初始值
lock = Lock() # 创建一个锁对象
processes = []
for i in range(5):
p = Process(target=increment_with_lock, args=(counter, lock))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"Lock 示例结束,最终计数器值: {counter.value}") # 预期值是 5 * 100000 = 500000
# 4.2 Event (事件)
# Event 是一种简单的进程间通信机制,一个进程可以设置一个标志,其他进程可以等待这个标志。
def waiter(event, name):
"""
等待事件被设置的进程。
:param event: 事件对象
:param name: 等待者名称
"""
print(f"等待者 {name} 启动,等待事件...")
event.wait() # 阻塞直到事件被设置
print(f"等待者 {name} 收到事件,继续执行。")
time.sleep(random.uniform(0.5, 1))
print(f"等待者 {name} 结束。")
def setter(event):
"""
设置事件的进程。
:param event: 事件对象
"""
print(f"设置者 {os.getpid()} 启动,将在 2 秒后设置事件。")
time.sleep(2)
event.set() # 设置事件,唤醒所有等待者
print(f"设置者 {os.getpid()} 已设置事件。")
time.sleep(1)
event.clear() # 清除事件,使其可以再次被等待
print(f"设置者 {os.getpid()} 已清除事件。")
print(f"设置者 {os.getpid()} 结束。")
def event_example():
"""
演示 Event 的使用。
"""
print("\n--- Event 进程同步示例 ---")
event = Event() # 创建一个事件对象
waiters = [Process(target=waiter, args=(event, f"Waiter-{i}")) for i in range(3)]
for w in waiters:
w.start()
p_setter = Process(target=setter, args=(event,))
p_setter.start()
for w in waiters:
w.join()
p_setter.join()
print("Event 示例结束。")
# 4.3 Semaphore (信号量)
# Semaphore 用于控制对有限资源的访问,它维护一个内部计数器,当计数器大于零时,
# 进程可以获取信号量(计数器减一),否则阻塞。释放信号量时,计数器加一。
def resource_user(semaphore, name):
"""
模拟使用有限资源的进程。
:param semaphore: 信号量对象
:param name: 用户名称
"""
print(f"用户 {name} 启动,尝试获取资源...")
semaphore.acquire() # 获取信号量,如果计数器为0则阻塞
try:
print(f"用户 {name} 成功获取资源,正在使用...")
time.sleep(random.uniform(1, 3)) # 模拟使用资源
finally:
semaphore.release() # 释放信号量
print(f"用户 {name} 释放资源。")
print(f"用户 {name} 结束。")
def semaphore_example():
"""
演示 Semaphore 的使用。
"""
print("\n--- Semaphore 进程同步示例 ---")
# 创建一个初始值为 2 的信号量,表示最多允许 2 个进程同时访问资源
semaphore = Semaphore(2)
users = [Process(target=resource_user, args=(semaphore, f"User-{i}")) for i in range(5)]
for u in users:
u.start()
for u in users:
u.join()
print("Semaphore 示例结束。")
# --- 5. 共享内存 ---
# multiprocessing 提供了两种方式来在进程间共享内存:Value 和 Array,以及 Manager。
# 5.1 Value (共享值)
# Value 用于在进程间共享一个单一的可变值。
def modify_value(num, lock):
"""
修改共享值的进程函数。
:param num: 共享值对象
:param lock: 锁对象,用于保护共享值的访问
"""
for _ in range(100000):
with lock: # 使用 with 语句自动获取和释放锁
num.value += 1
def value_example():
"""
演示 Value 的使用。
"""
print("\n--- Value 共享值示例 ---")
# 'i' 表示整数类型,0 是初始值
shared_value = Value('i', 0)
lock = Lock() # 同样需要锁来保护共享值的并发访问
processes = []
for i in range(5):
p = Process(target=modify_value, args=(shared_value, lock))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"Value 示例结束,最终共享值: {shared_value.value}") # 预期值是 5 * 100000 = 500000
# 5.2 Array (共享数组)
# Array 用于在进程间共享一个可变数组。
def modify_array(arr, index, lock):
"""
修改共享数组元素的进程函数。
:param arr: 共享数组对象
:param index: 要修改的数组索引
:param lock: 锁对象,用于保护共享数组的访问
"""
for _ in range(100000):
with lock:
arr[index] += 1
def array_example():
"""
演示 Array 的使用。
"""
print("\n--- Array 共享数组示例 ---")
# 'i' 表示整数类型,[0, 0, 0] 是初始值
shared_array = Array('i', [0, 0, 0])
lock = Lock() # 同样需要锁来保护共享数组的并发访问
processes = []
for i in range(3):
p = Process(target=modify_array, args=(shared_array, i, lock))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"Array 示例结束,最终共享数组: {list(shared_array)}") # 预期值是 [100000, 100000, 100000]
# 5.3 Manager (管理器)
# Manager 提供了一种更灵活的方式来创建可以在进程之间共享的 Python 对象,
# 例如列表、字典、队列、锁等。Manager 创建的对象是服务器进程中的代理对象。
def modify_list_with_manager(shared_list, name):
"""
使用 Manager 共享列表的进程函数。
:param shared_list: Manager 创建的共享列表
:param name: 进程名称
"""
print(f"进程 {name} 启动,修改共享列表。")
for _ in range(5):
shared_list.append(f"{name}-{random.randint(1, 100)}")
time.sleep(random.uniform(0.1, 0.3))
print(f"进程 {name} 结束。")
def manager_example():
"""
演示 Manager 的使用。
"""
print("\n--- Manager 共享对象示例 ---")
with Manager() as manager:
shared_list = manager.list() # 创建一个共享列表
print(f"初始共享列表: {shared_list}")
processes = []
for i in range(3):
p = Process(target=modify_list_with_manager, args=(shared_list, f"Manager-Worker-{i}"))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"Manager 示例结束,最终共享列表: {shared_list}")
# --- 6. 实际应用示例:并行计算 ---
# 使用 Pool 模块进行并行计算,加速处理大量数据。
def calculate_square(number):
"""
计算一个数的平方,模拟耗时操作。
:param number: 输入数字
:return: 数字的平方
"""
time.sleep(0.01) # 模拟一些计算时间
return number * number
def parallel_calculation_example():
"""
演示使用 Pool 进行并行计算。
"""
print("\n--- 实际应用示例:并行计算 ---")
data = range(1000) # 假设有 1000 个数字需要计算平方
start_time = time.time()
# 使用单进程计算
single_process_results = [calculate_square(n) for n in data]
end_time = time.time()
print(f"单进程计算耗时: {end_time - start_time:.4f} 秒")
start_time = time.time()
# 使用进程池并行计算
with Pool(processes=os.cpu_count()) as pool: # 使用 CPU 核心数作为进程池大小
parallel_results = pool.map(calculate_square, data)
end_time = time.time()
print(f"多进程计算耗时: {end_time - start_time:.4f} 秒 (使用 {os.cpu_count()} 个进程)")
# 验证结果是否一致
assert single_process_results == parallel_results
print("单进程和多进程计算结果一致。")
print("并行计算示例结束。")
if __name__ == '__main__':
process_example()
pool_example()
queue_example()
# 代码有问题,暂时没有调试通
# pipe_example()
lock_example()
event_example()
semaphore_example()
value_example()
array_example()
manager_example()
parallel_calculation_example()
引用链接
[1]
multiprocessing --- 基于进程的并行 — Python 3.14.0 文档: https://docs.python.org/zh-cn/3.14/library/multiprocessing.html[2]
论坛:Multiprocessing in Python: The Complete Guide : r/programming: https://www.reddit.com/r/programming/comments/vqdrtu/multiprocessing_in_python_the_complete_guide/[3]
Python Multiprocessing: The Complete Guide - Super Fast Python: https://superfastpython.com/multiprocessing-in-python/[4]
Thread vs Process in Python - Super Fast Python: https://superfastpython.com/thread-vs-process/
评论区