Python多进程

multiprocessing 是一个支持使用与 threading 模块类似的 API 来产生进程的包。 multiprocessing 包同时提供了本地和远程并发操作,通过使用子进程而非线程有效地绕过了 全局解释器锁

因此,multiprocessing 模块允许程序员充分利用给定机器上的多个处理器。 它在 UnixWindows 上均可运行。 multiprocessing 模块还引入了 Pool 对象,它提供了一种快捷的方法,赋予函数并行化处理一系列输入值的能力,可以将输入数据分配给不同进程处理(数据并行)。

Process

通过创建一个 Process 对象然后调用它的 start() 方法来生成进程。

from multiprocessing import Process

def f(name):
    print('hello', name)

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

Queue 队列

进程之间的通信通道之一

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()

Pipe 管道

进程之间的通信通道之一,返回一个由管道连接的连接对象,默认情况下是双工(双向)。 返回的两个连接对象表示管道的两端。每个连接对象都有 send()recv() 方法(相互之间的)。 { note waring no-icon} 请注意,如果两个进程(或线程)同时尝试读取或写入管道的同一端,则管道中的数据可能会损坏。当然,在不同进程中同时使用管道的不同端的情况下不存在损坏的风险。 { endnote}

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()

进程间同步

使用锁来确保一次只有一个进程打印到标准输出

from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

进程间共享状态

共享内存

可以使用 ValueArray 将数据存储在共享内存映射中。

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    # 3.1415927
    print(arr[:])
    # [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

创建 numarr 时使用的 'd''i' 参数是 array 模块使用的类型的 typecode'd' 表示双精度浮点数, 'i' 表示有符号整数。这些共享对象将是进程和线程安全的。为了更灵活地使用共享内存,可以使用 multiprocessing.sharedctypes 模块,该模块支持创建从共享内存分配的任意ctypes对象。

服务进程

Manager() 返回的管理器对象控制一个服务进程,该进程保存Python对象并允许其他进程使用代理操作它们。 Manager() 返回的管理器支持类型: listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValueArray 。例如

from multiprocessing import Process, Manager

def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))

        p = Process(target=f, args=(d, l))
        p.start()
        p.join()

        print(d)
        # {0.25: None, 1: '1', '2': 2}
        print(l)
        # [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
使用服务进程的管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。此外,单个管理器可以通过网络由不同计算机上的进程共享。但是,它们比使用共享内存慢。

使用工作进程 Pool()

Pool 类表示一个工作进程池。它具有允许以几种不同方式将任务分配到工作进程的方法。

import time
import os

def f(x):
    return x*x

if __name__ == '__main__':
    # 开 4 个工作进程
    with Pool(processes=4) as pool:

        # print "[0, 1, 4,..., 81]"
        print(pool.map(f, range(10)))

        # 以任意顺序打印相同的数字
        for i in pool.imap_unordered(f, range(10)):
            print(i)

        # evaluate "f(20)" asynchronously
        res = pool.apply_async(f, (20,))      # runs in *only* one process
        print(res.get(timeout=1))             # prints "400"

        # evaluate "os.getpid()" asynchronously
        res = pool.apply_async(os.getpid, ()) # runs in *only* one process
        print(res.get(timeout=1))             # prints the PID of that process

        # 异步启动多个 evaluations *可能*使用更多进程
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        print([res.get(timeout=1) for res in multiple_results])

        # 使一个进程 sleep 10 秒
        res = pool.apply_async(time.sleep, (10,))
        try:
            print(res.get(timeout=1))
        except TimeoutError:
            print("我们得到一个 multiprocessing.TimeoutError")

        print("目前,pool 仍然可供更多工作")

    # exiting the 'with'-block has stopped the pool
    print("现在 pool 已关闭,不再可用")

{ note info } 参数 processes 默认大小是CPU的核数, map(func, iterator) 获取结果,参数 func 是进程执行的函数,iterator 是可迭代对象 特点:保持阻塞直到获得结果,对于很长的迭代对象,可能消耗很多内存,可以考虑使用 imap() 或 imap_unordered() 并且显示指定 chunksize 以提升效率。 apply_async(func[, args]) 获取结果,参数 func 是进程执行的函数,args 是可迭代对象 { endnote }