目 录CONTENT

文章目录

Python并行编程:multiprocessing

Administrator
2025-10-15 / 0 评论 / 0 点赞 / 0 阅读 / 0 字

 

字数 4840,阅读大约需 25 分钟

Python并行编程:multiprocessing

什么是multiprocessing

绕过GIL全局解释器锁的限制,使用子进程并发执行,充分利用现代多处理器;


主要特点和限制

Availability: not Android, not iOS, not WASI. 此模块在 移动平台 或 WebAssembly 平台 上不受支持。

概述

multiprocessing 是一个支持使用与 threading 模块类似的 API 来产生进程的包。 multiprocessing 包同时提供了本地和远程并发操作,通过使用子进程而非线程有效地绕过了 全局解释器锁。 因此,multiprocessing 模块允许程序员充分利用给定机器上的多个处理器。 它在 POSIX 和 Windows 上均可运行。

multiprocessing 模块还引入了在 threading 模块中没有的API。一个主要的例子就是 Pool 对象,它提供了一种快捷的方法,赋予函数并行化处理一系列输入值的能力,可以将输入数据分配给不同进程处理(数据并行)。

核心资料

multiprocessing --- 基于进程的并行 — Python 3.14.0 文档[1]

底层操作系统控制着新进程的创建方式。在某些系统上,这可能需要生成一个新进程,而在另一些系统上,则可能需要对进程进行分叉。Python中用于创建新进程的特定于操作系统的方法无需我们担心,因为它由你安装的Python解释器进行管理

论坛:Multiprocessing in Python: The Complete Guide : r/programming[2]

Python Multiprocessing: The Complete Guide - Super Fast Python[3]
完整学习多处理器,请务必阅读上面的文章!!!

Python进程

每个Python程序都是一个进程,并且有一个名为主线程的默认线程,用于执行程序指令。实际上,每个进程都是执行Python指令(Python字节码)的Python解释器的一个实例,其级别比你在Python程序中输入的代码稍低。2组基本概念:

  1. 1. Concurrent: Code that can be executed out of order.并发:可以无序执行的代码。

  2. 2. Parallel: Capability to execute code simultaneously.并行:同时执行代码的能力。

进程和线程

线程始终存在于进程中,它代表了指令或代码的执行方式。一个进程至少会有一个线程,称为主线程。我们在该进程内创建的任何额外线程都将属于该进程。Python解释器的实例至少有一个名为主线程的线程。Python进程中的执行线程,例如主线程或新线程。


    
    
    
  # a target function that does something
def work()
    # do something...
 
# create a thread to execute the work() function
thread = Thread(target=work)
# start the thread
thread.start()

Thread vs Process in Python - Super Fast Python[4]

父进程和子进程

创建子进程的进程称为父进程。一个子进程永远只有一个父进程。子进程也可称为 subprocess,因为它是由另一个进程创建的,而非由操作系统直接创建。话虽如此,所有进程的创建和管理都是由底层操作系统处理的。
创建子进程的3种方式:fork、spawn和fork server。multiprocessing --- 基于进程的并行 — Python 3.14.0 文档[1]

Run Code 示例演示

参数可以指定为元组并传递给multiprocessing.Process类构造函数的“args”参数,或者作为字典传递给“kwargs”参数。


    
    
    
  process = multiprocessing.Process(target=task, args=(arg1, arg2))
process.start()

将创建一个新的Python解释器实例,并在新进程中创建一个新线程来执行我们的目标函数。
限制🚫:我们无法精确控制进程何时执行,也无法控制哪个CPU核心会执行该进程。这两项都是由底层操作系统负责处理的低级任务。


    
    
    
  # SuperFastPython.com
# example of running a function in another process
from time import sleep
from multiprocessing import Process
 
# a custom function that blocks for a moment
def task():
    # block for a moment
    sleep(1)
    # display a message
    print('This is from another process')
 
# entry point
if __name__ == '__main__':
    # create a process
    process = Process(target=task)
    # run the process
    process.start()
    # wait for the process to finish
    print('Waiting for the process...')
    process.join()

更多完整学习路线

1. Python Processes(Python进程)

  • • What Are Processes(什么是进程)

  • • Thread vs Process(线程与进程)

  • • Life-Cycle of a Process(进程的生命周期)

  • • Child vs Parent Process(子进程与父进程)

2. Run a Function in a Process(在进程中运行函数)

  • • How to Run a Function In a Process(如何在进程中运行函数)

  • • Example of Running a Function in a Process(示例)

  • • Example of Running a Function in a Process With Arguments(带参数的示例)

3. Extend the Process Class(扩展Process类)

  • • How to Extend the Process Class(如何扩展Process类)

  • • Example of Extending the Process Class(示例)

  • • Example of Extending the Process Class and Returning Values(返回值示例)

4. Process Start Methods(进程启动方法)

  • • What is a Start Method(什么是启动方法)

  • • How to Change The Start Method(如何更改启动方法)

  • • How to Set Start Method Via Context(通过上下文设置启动方法)

5. Process Instance Attributes(进程实例属性)

  • • Query Process Name(查询进程名称)

  • • Query Process Daemon(查询守护进程)

  • • Query Process PID(查询进程PID)

  • • Query Process Alive(查询进程存活状态)

  • • Query Process Exit Code(查询进程退出码)

6. Configure Processes(配置进程)

  • • How to Configure Process Name(如何配置进程名称)

  • • How to Configure a Daemon Process(如何配置守护进程)

7. Main Process(主进程)

  • • What is the Main Process(什么是主进程)

  • • How Can the Main Process Be Identified(如何识别主进程)

  • • How to Get the Main Process(如何获取主进程)

  • • What is the Name of the Main Process(主进程的名称)

8. Process Utilities(进程工具)

  • • Active Child Processes(活动子进程)

  • • Get The Number of CPU Cores(获取CPU核心数)

  • • The Current Process(当前进程)

  • • The Parent Process(父进程)

9. Process Mutex Lock(进程互斥锁)

  • • What is a Mutual Exclusion Lock(什么是互斥锁)

  • • How to Use a Mutex Lock(如何使用互斥锁)

  • • Example of Using a Mutex Lock(示例)

10. Process Reentrant Lock(进程可重入锁)

  • • What is a Reentrant Lock(什么是可重入锁)

  • • How to Use the Reentrant Lock(如何使用可重入锁)

  • • Example of Using a Reentrant Lock(示例)

11. Process Condition Variable(进程条件变量)

  • • What is a Process Condition Variable(什么是条件变量)

  • • How to Use a Condition Variable(如何使用条件变量)

  • • Example of Wait and Notify With a Condition Variable(等待和通知示例)

12. Process Semaphore(进程信号量)

  • • What is a Semaphore(什么是信号量)

  • • How to Use a Semaphore(如何使用信号量)

  • • Example of Using a Semaphore(示例)

13. Process Event(进程事件)

  • • How to Use an Event Object(如何使用事件对象)

  • • Example of Using an Event Object(示例)

14. Process Barrier(进程屏障)

  • • What is a Barrier(什么是屏障)

  • • How to Use a Barrier(如何使用屏障)

  • • Example of Using a Process Barrier(示例)

15. Python Multiprocessing Best Practices(最佳实践)

  • • Tip 1: Use Context Managers(使用上下文管理器)

  • • Tip 2: Use Timeouts When Waiting(等待时使用超时)

  • • Tip 3: Use Main Module Idiom(使用主模块习惯用法)

  • • Tip 4: Use Shared ctypes(使用共享ctypes)

  • • Tip 5: Use Pipes and Queues(使用管道和队列)

16. Python Multiprocessing Common Errors(常见错误)

  • • Error 1: RuntimeError Starting New Processes(启动新进程的运行时错误)

  • • Error 2: print() Does Not Work In Child Processes(print()在子进程中不工作)

  • • Error 3: Adding Attributes to Classes that Extend Process(向扩展Process的类添加属性)

17. Python Multiprocessing Common Questions(常见问题)

  • • How to Safely Stop a Process?(如何安全停止进程)

  • • How to Kill a Process?(如何杀死进程)

教程代码


    
    
    
  # -*- coding: utf-8 -*-
"""
Python multiprocessing 模块使用教程

multiprocessing 是 Python 中用于实现多进程编程的模块,它允许程序创建并管理多个独立的进程。
每个进程都有自己独立的内存空间,因此它们之间的数据共享需要特定的机制。
multiprocessing 模块提供了类似于 threading 模块的 API,但使用的是进程而不是线程,
这使得它能够充分利用多核 CPU 的优势,避免 GIL(全局解释器锁)的限制。

主要特点:
1. 进程独立:每个进程有独立的内存空间,避免了线程间共享数据可能导致的复杂性。
2. 利用多核:可以充分利用多核 CPU,实现真正的并行计算。
3. 丰富的工具:提供了 Process、Pool、Queue、Pipe、Lock、Event、Semaphore 等多种工具,
   用于进程的创建、管理、通信和同步。

本教程将详细介绍 multiprocessing 模块的常用功能和使用方法。
"""

import os
import time
import random
from multiprocessing import Process, Queue, Pipe, Lock, Event, Semaphore, Value, Array, Pool, Manager

# --- 1. Process 类的使用 ---
# Process 类是 multiprocessing 模块中最基本的组件,用于创建新的进程。

def worker_function(name):
    """
    一个简单的进程工作函数,打印进程信息和执行任务。
    :param name: 进程名称
    """
    print(f"子进程 {name} 启动,PID: {os.getpid()}")
    time.sleep(random.uniform(0.5, 2))  # 模拟耗时操作
    print(f"子进程 {name} 结束,PID: {os.getpid()}")

def process_example():
    """
    演示 Process 类的基本使用。
    """
    print("\n--- Process 类使用示例 ---")
    print(f"主进程启动,PID: {os.getpid()}")

    # 创建一个 Process 对象
    # target 参数指定进程要执行的函数
    # args 参数以元组形式传递给 target 函数的参数
    p1 = Process(target=worker_function, args=("Worker-1",))
    p2 = Process(target=worker_function, args=("Worker-2",))

    # 启动子进程
    p1.start()
    p2.start()

    # 等待子进程结束
    # join() 方法会阻塞主进程,直到对应的子进程执行完毕。
    # 注意:p1.join() 和 p2.join() 是主进程等待子进程的独立操作,
    # 它们不意味着 p2 必须等待 p1 完成。子进程 p1 和 p2 是并行执行的。
    p1.join()
    p2.join()

    print("所有子进程已结束,主进程继续执行。")

# --- 2. Pool 类的使用 ---
# Pool 类提供了一种方便的方式来管理一组工作进程,可以并行地执行任务。
# 它会自动管理进程的创建和销毁,并提供 map、apply、apply_async 等方法。

def cube(x):
    """
    计算一个数的立方。
    :param x: 输入数字
    :return: x 的立方
    """
    print(f"进程 {os.getpid()} 计算 {x}^3")
    time.sleep(0.1) # 模拟耗时
    return x * x * x

def pool_example():
    """
    演示 Pool 类的基本使用。
    """
    print("\n--- Pool 类使用示例 ---")
    # 创建一个包含 4 个工作进程的进程池
    # 默认情况下,进程池的大小是 os.cpu_count()
    with Pool(processes=4) as pool:
        # 使用 map 方法并行处理可迭代对象中的每个元素
        # map 会阻塞直到所有结果都返回,并且结果的顺序与输入顺序一致
        results = pool.map(cube, range(1, 6))
        print(f"map 方法结果: {results}")

        # 使用 apply_async 方法异步提交任务
        # apply_async 不会阻塞,会立即返回一个 AsyncResult 对象
        # 可以通过 AsyncResult.get() 获取结果,get() 会阻塞直到结果可用
        async_results = [pool.apply_async(cube, (i,)) for i in range(6, 11)]
        print("apply_async 任务已提交,等待结果...")
        for res in async_results:
            print(f"apply_async 方法结果: {res.get()}")

    print("Pool 示例结束。")

# --- 3. 进程间通信 (IPC) ---
# 进程之间由于内存独立,需要特定的机制进行数据交换。multiprocessing 提供了 Queue 和 Pipe。

# 3.1 Queue (队列)
# Queue 是一个多进程安全的队列,可以用于在多个进程之间传递数据。

def producer(q, data):
    """
    生产者进程,向队列中放入数据。
    :param q: 队列对象
    :param data: 要放入队列的数据
    """
    print(f"生产者 {os.getpid()} 启动,准备放入数据: {data}")
    for item in data:
        q.put(item)
        print(f"生产者 {os.getpid()} 放入: {item}")
        time.sleep(random.uniform(0.1, 0.5))
    q.put(None) # 发送结束信号
    print(f"生产者 {os.getpid()} 结束。")

def consumer(q):
    """
    消费者进程,从队列中取出数据。
    :param q: 队列对象
    """
    print(f"消费者 {os.getpid()} 启动,准备取出数据。")
    while True:
        item = q.get()
        if item is None: # 收到结束信号
            break
        print(f"消费者 {os.getpid()} 取出: {item}")
        time.sleep(random.uniform(0.1, 0.5))
    print(f"消费者 {os.getpid()} 结束。")

def queue_example():
    """
    演示 Queue 的使用。
    """
    print("\n--- Queue 进程间通信示例 ---")
    q = Queue() # 创建一个队列

    data_to_produce = ["apple", "banana", "cherry", "date", "elderberry"]

    p_producer = Process(target=producer, args=(q, data_to_produce))
    p_consumer = Process(target=consumer, args=(q,))

    p_producer.start()
    p_consumer.start()

    p_producer.join()
    p_consumer.join()
    print("Queue 示例结束。")

# 3.2 Pipe (管道)
# Pipe 用于在两个进程之间创建双向通信通道。它返回一对连接对象 (conn1, conn2)。
# 每个连接对象都有 send() 和 recv() 方法。

def sender(conn, messages):
    """
    发送方进程,通过管道发送消息。
    :param conn: 管道连接对象
    :param messages: 要发送的消息列表
    """
    print(f"发送方 {os.getpid()} 启动,准备发送消息。")
    for msg in messages:
        conn.send(msg)
        print(f"发送方 {os.getpid()} 发送: {msg}")
        time.sleep(random.uniform(0.1, 0.5))
    time.sleep(0.1) # 确保接收方有时间处理最后一条消息
    conn.close() # 关闭连接
    print(f"发送方 {os.getpid()} 结束。")

def receiver(conn):
    """
    接收方进程,通过管道接收消息。
    :param conn: 管道连接对象
    """
    print(f"接收方 {os.getpid()} 启动,准备接收消息。")
    while True:
        try:
            msg = conn.recv()
            print(f"接收方 {os.getpid()} 接收: {msg}")
            time.sleep(random.uniform(0.1, 0.5)) # 模拟处理时间
        except EOFError: # 当发送方关闭连接时,recv() 会抛出 EOFError
            break # 收到结束信号,立即退出循环
        except Exception as e:
            print(f"接收方 {os.getpid()} 发生错误: {e}")
            break
    conn.close() # 关闭连接
    print(f"接收方 {os.getpid()} 结束。")

def pipe_example():
    """
    演示 Pipe 的使用。
    """
    print("\n--- Pipe 进程间通信示例 ---")
    parent_conn, child_conn = Pipe() # 创建一个管道

    messages_to_send = ["Hello", "from", "parent", "process", "via", "Pipe!"]

    p_sender = Process(target=sender, args=(parent_conn, messages_to_send))
    p_receiver = Process(target=receiver, args=(child_conn,))

    p_sender.start()
    p_receiver.start()

    p_sender.join()
    p_receiver.join()
    print("Pipe 示例结束。")

# --- 4. 进程同步 ---
# 当多个进程需要访问共享资源时,为了避免数据竞争和不一致性,需要使用同步机制。

# 4.1 Lock (锁)
# Lock 用于控制对共享资源的访问,确保在任何给定时间只有一个进程可以访问该资源。

def increment_with_lock(counter, lock):
    """
    使用锁保护共享计数器递增。
    :param counter: 共享计数器 (multiprocessing.Value)
    :param lock: 锁对象
    """
    for _ in range(100000):
        lock.acquire() # 获取锁
        try:
            counter.value += 1
        finally:
            lock.release() # 释放锁

def lock_example():
    """
    演示 Lock 的使用。
    """
    print("\n--- Lock 进程同步示例 ---")
    # Value 用于在进程间共享一个可变的值
    counter = Value('i', 0) # 'i' 表示整数类型,0 是初始值
    lock = Lock() # 创建一个锁对象

    processes = []
    for i in range(5):
        p = Process(target=increment_with_lock, args=(counter, lock))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    print(f"Lock 示例结束,最终计数器值: {counter.value}") # 预期值是 5 * 100000 = 500000

# 4.2 Event (事件)
# Event 是一种简单的进程间通信机制,一个进程可以设置一个标志,其他进程可以等待这个标志。

def waiter(event, name):
    """
    等待事件被设置的进程。
    :param event: 事件对象
    :param name: 等待者名称
    """
    print(f"等待者 {name} 启动,等待事件...")
    event.wait() # 阻塞直到事件被设置
    print(f"等待者 {name} 收到事件,继续执行。")
    time.sleep(random.uniform(0.5, 1))
    print(f"等待者 {name} 结束。")

def setter(event):
    """
    设置事件的进程。
    :param event: 事件对象
    """
    print(f"设置者 {os.getpid()} 启动,将在 2 秒后设置事件。")
    time.sleep(2)
    event.set() # 设置事件,唤醒所有等待者
    print(f"设置者 {os.getpid()} 已设置事件。")
    time.sleep(1)
    event.clear() # 清除事件,使其可以再次被等待
    print(f"设置者 {os.getpid()} 已清除事件。")
    print(f"设置者 {os.getpid()} 结束。")

def event_example():
    """
    演示 Event 的使用。
    """
    print("\n--- Event 进程同步示例 ---")
    event = Event() # 创建一个事件对象

    waiters = [Process(target=waiter, args=(event, f"Waiter-{i}")) for i in range(3)]
    for w in waiters:
        w.start()

    p_setter = Process(target=setter, args=(event,))
    p_setter.start()

    for w in waiters:
        w.join()
    p_setter.join()
    print("Event 示例结束。")

# 4.3 Semaphore (信号量)
# Semaphore 用于控制对有限资源的访问,它维护一个内部计数器,当计数器大于零时,
# 进程可以获取信号量(计数器减一),否则阻塞。释放信号量时,计数器加一。

def resource_user(semaphore, name):
    """
    模拟使用有限资源的进程。
    :param semaphore: 信号量对象
    :param name: 用户名称
    """
    print(f"用户 {name} 启动,尝试获取资源...")
    semaphore.acquire() # 获取信号量,如果计数器为0则阻塞
    try:
        print(f"用户 {name} 成功获取资源,正在使用...")
        time.sleep(random.uniform(1, 3)) # 模拟使用资源
    finally:
        semaphore.release() # 释放信号量
        print(f"用户 {name} 释放资源。")
    print(f"用户 {name} 结束。")

def semaphore_example():
    """
    演示 Semaphore 的使用。
    """
    print("\n--- Semaphore 进程同步示例 ---")
    # 创建一个初始值为 2 的信号量,表示最多允许 2 个进程同时访问资源
    semaphore = Semaphore(2)

    users = [Process(target=resource_user, args=(semaphore, f"User-{i}")) for i in range(5)]
    for u in users:
        u.start()

    for u in users:
        u.join()
    print("Semaphore 示例结束。")

# --- 5. 共享内存 ---
# multiprocessing 提供了两种方式来在进程间共享内存:Value 和 Array,以及 Manager。

# 5.1 Value (共享值)
# Value 用于在进程间共享一个单一的可变值。

def modify_value(num, lock):
    """
    修改共享值的进程函数。
    :param num: 共享值对象
    :param lock: 锁对象,用于保护共享值的访问
    """
    for _ in range(100000):
        with lock: # 使用 with 语句自动获取和释放锁
            num.value += 1

def value_example():
    """
    演示 Value 的使用。
    """
    print("\n--- Value 共享值示例 ---")
    # 'i' 表示整数类型,0 是初始值
    shared_value = Value('i', 0)
    lock = Lock() # 同样需要锁来保护共享值的并发访问

    processes = []
    for i in range(5):
        p = Process(target=modify_value, args=(shared_value, lock))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    print(f"Value 示例结束,最终共享值: {shared_value.value}") # 预期值是 5 * 100000 = 500000

# 5.2 Array (共享数组)
# Array 用于在进程间共享一个可变数组。

def modify_array(arr, index, lock):
    """
    修改共享数组元素的进程函数。
    :param arr: 共享数组对象
    :param index: 要修改的数组索引
    :param lock: 锁对象,用于保护共享数组的访问
    """
    for _ in range(100000):
        with lock:
            arr[index] += 1

def array_example():
    """
    演示 Array 的使用。
    """
    print("\n--- Array 共享数组示例 ---")
    # 'i' 表示整数类型,[0, 0, 0] 是初始值
    shared_array = Array('i', [0, 0, 0])
    lock = Lock() # 同样需要锁来保护共享数组的并发访问

    processes = []
    for i in range(3):
        p = Process(target=modify_array, args=(shared_array, i, lock))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    print(f"Array 示例结束,最终共享数组: {list(shared_array)}") # 预期值是 [100000, 100000, 100000]

# 5.3 Manager (管理器)
# Manager 提供了一种更灵活的方式来创建可以在进程之间共享的 Python 对象,
# 例如列表、字典、队列、锁等。Manager 创建的对象是服务器进程中的代理对象。

def modify_list_with_manager(shared_list, name):
    """
    使用 Manager 共享列表的进程函数。
    :param shared_list: Manager 创建的共享列表
    :param name: 进程名称
    """
    print(f"进程 {name} 启动,修改共享列表。")
    for _ in range(5):
        shared_list.append(f"{name}-{random.randint(1, 100)}")
        time.sleep(random.uniform(0.1, 0.3))
    print(f"进程 {name} 结束。")

def manager_example():
    """
    演示 Manager 的使用。
    """
    print("\n--- Manager 共享对象示例 ---")
    with Manager() as manager:
        shared_list = manager.list() # 创建一个共享列表
        print(f"初始共享列表: {shared_list}")

        processes = []
        for i in range(3):
            p = Process(target=modify_list_with_manager, args=(shared_list, f"Manager-Worker-{i}"))
            processes.append(p)
            p.start()

        for p in processes:
            p.join()

        print(f"Manager 示例结束,最终共享列表: {shared_list}")

# --- 6. 实际应用示例:并行计算 ---
# 使用 Pool 模块进行并行计算,加速处理大量数据。

def calculate_square(number):
    """
    计算一个数的平方,模拟耗时操作。
    :param number: 输入数字
    :return: 数字的平方
    """
    time.sleep(0.01) # 模拟一些计算时间
    return number * number

def parallel_calculation_example():
    """
    演示使用 Pool 进行并行计算。
    """
    print("\n--- 实际应用示例:并行计算 ---")
    data = range(1000) # 假设有 1000 个数字需要计算平方

    start_time = time.time()
    # 使用单进程计算
    single_process_results = [calculate_square(n) for n in data]
    end_time = time.time()
    print(f"单进程计算耗时: {end_time - start_time:.4f} 秒")

    start_time = time.time()
    # 使用进程池并行计算
    with Pool(processes=os.cpu_count()) as pool: # 使用 CPU 核心数作为进程池大小
        parallel_results = pool.map(calculate_square, data)
    end_time = time.time()
    print(f"多进程计算耗时: {end_time - start_time:.4f} 秒 (使用 {os.cpu_count()} 个进程)")

    # 验证结果是否一致
    assert single_process_results == parallel_results
    print("单进程和多进程计算结果一致。")
    print("并行计算示例结束。")

if __name__ == '__main__':
    process_example()
    pool_example()
    queue_example()
    # 代码有问题,暂时没有调试通
    # pipe_example()
    lock_example()
    event_example()
    semaphore_example()
    value_example()
    array_example()
    manager_example()
    parallel_calculation_example()

引用链接

[1] multiprocessing --- 基于进程的并行 — Python 3.14.0 文档: https://docs.python.org/zh-cn/3.14/library/multiprocessing.html
[2] 论坛:Multiprocessing in Python: The Complete Guide : r/programming: https://www.reddit.com/r/programming/comments/vqdrtu/multiprocessing_in_python_the_complete_guide/
[3] Python Multiprocessing: The Complete Guide - Super Fast Python: https://superfastpython.com/multiprocessing-in-python/
[4] Thread vs Process in Python - Super Fast Python: https://superfastpython.com/thread-vs-process/

 

0
  1. 支付宝打赏

    qrcode alipay
  2. 微信打赏

    qrcode weixin

评论区