multiprocessing - Python中的并发处理

之前写过java,C++ 等程序,接触到python后如果想写一些需要大量计算的程序,首先想到的可能会是一个多线程程序。但是……

threading模块和multiprocessing模块

在python中threading模块用于处理多线程问题,但是由于Python的GIL(全局解释锁),导致python中的多线程不能利用多核CPU。通过实际coding可以发现,python中使用threading实现的多线程计算程序实际上最多只能使用一个CPU核心 ,所以对于需要大量计算的应用来说,threading模块实际起不到什么作用。

计算机程序可以分为 计算密集型IO密集型 两种:
通过以上分析可以发现,threading模块由于不能充分利用多核CPU,所以对于计算密集型的程序是没有意义的。
但是对于IO密集型程序,threading模块却能够利用CPU的性能。如果程序中需要进行大量的网络传输或者文件读写等IO操作时,由于计算机IO的速度远没有CPU处理数据的速度快,所以必然会出现CPU等待IO完成的现象,如果在一段代码处在等待IO时执行其他待执行的代码,必然能够加快程序的执行速度并充分利用CPU。

所有threading模块对于IO密集型程序有优化作用,对于计算密集型的程序基本没有什么作用。

对于 计算密集型的程序,python使用的是multiprocessing,即多进程。下文整理python的多进程使用方法。首先认识进程对象Process。然后进程安全机制。进程池pool的使用。

Process

初始化

1
2
3
4
5
class multiprocessing.Process(group=None  # 为了和Threading保持统一,此处无用
,target=None # 目标函数
,name=None # 该进程的名字
,args=() # 目标函数的位置参数&元组参数
,kwargs={}) # 目标函数的字典参数

关键函数:

start() # 开始执行
is_alive() #
join([timeout]) #
terminate() # 终止
exitcode # 属性,表明该进程退出的状态, 状态值的含义 https://docs.python.org/2/library/signal.html

实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from multiprocessing import Process
import time
import os

def info(title):
process_info = u'title:' + title
process_info += u'|module name:' + __name__
if hasattr(os, u'getppid'): # only available on Unix
process_info += u'|parent process:' + unicode(os.getppid())
process_info += u'|process id:' + unicode(os.getpid())
print process_info

def f(name):
info(u'function f')
time.sleep(0.5)
print u'hello', name

info(u'main line')
p1 = Process(target=f, args=(u'bob',))
p1.start()
print u'p1 is_alive:', p1.is_alive()
p1.join()
print u'p1 is_alive:', p1.is_alive()

p2 = Process(target=f, args=(u'tom',))
p2.start()
print u'p2 started, p2 is_alive:', p2.is_alive()
p2.terminate()
time.sleep(0.1) # 需要一段时间才能停止p2
print u'p2 terminated, p2 is_alive:', p2.is_alive()
print p2.exitcode
================================outpupt===================================================
title:main line|module name:__main__|parent process:2794|process id:2824
title:function f|module name:__main__|parent process:2824|process id:3761
p1 is_alive: True
hello bob
p1 is_alive: False
p2 started, p2 is_alive: True
p2 terminated, p2 is_alive: False
-15

进程之间传输数据

(Exchanging objects between processes)

有两种进程之间通信的方式Pipes和Queues。

  • Pipe() 两个进程之间的通信
  • queue多生产者和多消费者之间通信。包括Queue, multiprocessing.queues.SimpleQueueJoinableQueue 三种

注意:Pipe(), 和 queue 只能用于Process之间的通信。不用用于Pool管理的进程之间的通信

Pipe

定义:

1
Pipe(duplex=True)  # 通过查看源码可以发现,Pipe实际上是一个函数

传输的数据对象可以使用pickle进行序列化时

  • send(obj) 将obj使用pickle序列化后送入pipe通道
  • recv() 接受send(obj)发送的pickle序列化数据,并解析成原始的obj对象。如果pipe中没有数据时,该函数会阻塞其所在进程,直到接收到新的数据.

如果传输的对象不能使用pickle序列化,可以使用如下方法以byte进行传输:

  • send_bytes(buffer[, offset[, size]])

  • recv_bytes([maxlength])

  • recv_bytes_into(buffer[, offset])

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from multiprocessing import Process, Pipe
import time
import os

def client(conn):
i = 0
while i < 20:
print 'client', 'send',i
conn.send(i)
time.sleep(0.1)
i += 1
r = conn.recv()
print 'client', 'recv',r
conn.send(None)
conn.close()

def calc(conn):
i = 0
while True:
r = conn.recv()
if r == None:
break
print 'calc', 'recv', r
time.sleep(0.5)
conn.send(str(r) + '*' + str(r) + '=' + str(r*r))
conn.close()

# duplex=True 双向通信 ,
# duplex=False 单向传输,conn1 接收端,conn2 发送端
conn1, conn2 = Pipe(duplex=True)
send_pro = Process(target=client, args=(conn2,))
recv_pro = Process(target=calc, args=(conn1,))
send_pro.start()
recv_pro.start()
send_pro.join()
recv_pro.join()

Queues

用于进程间通信的还有队列,多进程的队列对象实际上是对Pipe的封装。存在如下三个队列类。

1
2
3
multiprocessing.Queue  # 对Pipe的封装
multiprocessing.JoinableQueue # 继承自Queue,增加了join() 和 task_done() 方法
multiprocessing.SimpleQueue # 对Pipe的封装, 相对于Queue来说 更简单

Queue

使用Queue将数据从生产者传输到消费之的流程如下。

如上所述,Queue 实际上是对Pipe的封装,但是当生产者将数据放入Queue时,不是直接放入Pipe中,Queue使用了一个缓冲区Bufferput函数先将数据放入Buffer中,再由一个专门的feed线程将其放入Pipe当中。消费之则是直接从Pipeget对象。

定义:

1
Queue([maxsize])  # 队列同时能容纳的对象的数量

关键函数:

  • 使用标准库中的 Queue.EmptyQueue.Fullexceptions 来判断队列是否已经满了,用在put函数中。也可以使用如下两个函数,但是不可靠。empty()full()
  • put(obj[, block[, timeout]])
    obj添加到Queue中。如果队列已满,则阻塞该进程timeout长时间,如果timeout时间以后队列还是满的,则产生异常Queue.Fullblock 默认 Truetimeout 默认无穷大(None)。blockFalse时,如果队列是满的直接产生异常Queue.Full
  • put_nowait(obj)

    等价于 put(obj, block=False).

  • get([block[, timeout]])
    从队列中取出一个对象。如果队列是空的则阻塞该进程timeout长时间。如果timeout时间以后队列还是空的,则产生异常Queue.Emptyblock 默认 Truetimeout 默认无穷大(None)。blockFalse时,如果队列是空的直接产生异常Queue.Empty
  • get_nowait()

    等价于 get(block=False).

由于队列中feed线程的存在,Queue使用如下三个函数来对其进程处理。(标准库中的队列没有如下三个方法)

  • close()
    该进程不在向队列中写入数据,并调用join_thread(),将buffer中的数据写入Pipe
    注意:只能在生产者端使用。如果在消费者端使用,则消费之不能从队列中get数据。但是生产者仍然可以写入数据。
  • join_thread()

    join feed线程,等待buffer中的数据写入Pipe

  • cancel_join_thread()

    假设消费者不在消费数据,则由于join_thread()可能带来一些死锁问题,即,Buffer的数据无法写入Pipe中。这时可以使用 cancel_join_thread() 来终止feed线程。注意:此时buffer中的数据将会丢失

实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import time
from multiprocessing import Process, Queue, current_process
from Queue import Empty
def produce(q):
for i in xrange(0, 20):
print "process name: " + current_process().name + ', put:' + str(i)
q.put(i)
time.sleep(0.1)
q.close()


def consum(q):
try_times = 0
while True:
try:
r = q.get(True, 1)
print "process name: " + current_process().name + ', get:' + str(r)
except Empty:
continue


if __name__ == '__main__':
q = Queue(5)
producers = [Process(target=produce, name='p' + str(i), args=(q,)) for i in xrange(0, 3)]
consumers = [Process(target=consum, name='c' + str(i), args=(q,)) for i in xrange(0, 2)]
for c in consumers:
c.start()
for p in producers:
p.start()
p.join()
# 不要直接这样做,此时还不确定消费之是否已经处理完成数据。还是推荐使用JoinableQueue
for c in consumers:
c.terminate()

JoinableQueue

multiprocessing.Queue是模仿标准库中的队列写的,但是没有task_done()join() 方法。JoinableQueue继承了multiprocessing.Queue并实现了task_done()join() 方法。

task_done()
由队列的消费者调用。消费则调用get()得到一个队列中的数据(任务),处理完成这个数据以后,调用task_done()告诉队列该任务已经处理完毕。队列中的每一个对象都对应一个task_done()

join()

阻塞调用进程,直到队列中的所有数据(任务)被消费掉。当有数据被加入队列,未完成的任务数就会增加。当消费者调用task_done(),意味着有消费者取得任务并完成任务,未完成的任务数就会减少。当未完成的任务数降到0,join()解除阻塞。

实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import time
from multiprocessing import Process, JoinableQueue, current_process
from Queue import Empty

def produce(q):
for i in xrange(0, 20):
print "process name: " + current_process().name + ', put:' + str(i)
q.put(i)
time.sleep(0.1)
q.close()

def consum(q):
try_times = 0
while True:
try:
r = q.get(True, 1)
print "process name: " + current_process().name + ', get:' + str(r)
q.task_done() # 数据r已经处理完毕
except Empty:
continue

if __name__ == '__main__':
q = JoinableQueue(5)
producers = [Process(target=produce, name='p' + str(i), args=(q,)) for i in xrange(0, 3)]
consumers = [Process(target=consum, name='c' + str(i), args=(q,)) for i in xrange(0, 2)]
for c in consumers:
c.start()
for p in producers:
p.start()
p.join()
q.join()
print 'all task finished'
for c in consumers:
c.terminate()

Synchronization机制

(多进程同步操作共享资源)

Lock()
在多进程python程序中只有一个进程执行,并阻塞其他进程。

应用场景:当多个进程需要操作共享资源的时候,使用Lock来避免操作的冲突。

如下面例子,10个进程同步写入文件。当一个进程进程开始写入时(即lock.acquire()执行后),阻塞其他进程的操作。直到该进程执行lock.release() 后,其他进程才能进行写入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import multiprocessing
import sys

def worker_with(lock, f, i):
with lock:
fs = open(f, "a+")
fs.write('Lock acquired via with ' + str(i) + '\n')
fs.close()

def worker_no_with(lock, f, i):
lock.acquire()
try:
fs = open(f, "a+")
fs.write('Lock acquired via with ' + str(i) + '\n')
fs.close()
finally:
lock.release()

if __name__ == "__main__":
f = "file.txt"
lock = multiprocessing.Lock()
# writers = [multiprocessing.Process(target=worker_with, args=(lock, f, i)) for i in xrange(0, 10)]
# for w in writers:
# w.start()
# w.join()
writers = [multiprocessing.Process(target=worker_no_with, args=(lock, f, i)) for i in xrange(0, 10)]
for w in writers:
w.start()
w.join()

进程间共享数据

共享内存

应用场景举例:比如我想使用一个多进程程序统计一个文件夹下所有文件的行数(每个进程一次统计一篇文章的行数)。如上图所示,多个进程同时读写同一个资源。

Python多进程机制使用的是从内存中申请一块内存,让所有的进程能够同时读写这块内容。 multiprocessing提供了multiprocessing.Valuemultiprocessing.Array 来作为共享内存。

multiprocessing.Valuemultiprocessing.Array 是进程安全的,所以不用使用lock

实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from multiprocessing import Process, Value, Array

def f(n, a):
n.value += 1
print 'arr pre', arr[:]
for i in range(len(a)):
a[i] = -a[i]
print 'arr post', arr[:]

if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(10))
ps = [Process(target=f, args=(num, arr)) for i in xrange(0, 10)]
for p in ps:
p.start()
p.join()
print num.value
print arr[:]

注意:上面Value和Array的定义方式。Value 和 Array 都需要设置其中存放值的类型,d 是 double 类型,i 是 int 类型,具体的对应关系在Python 标准库的 sharedctypes 模块中查看。如有需要请参考https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.sharedctypes。

共享进程

使用一个进程来管理需要在多进程中共享的数据。其他进程可以对其管理的数据进行操作。如下图所示:

为了理解multiprocessing使用一个进程来共享数据的机制,我们需要理解如下四者的关系:

class multiprocessing.managers.BaseManager([address[, authkey]])

Proxy

class multiprocessing.managers.SyncManager :是一个已经注册了常用共享对象的 BaseManager, 是BaseManager的子类

multiprocessing.Manager() :是一个能够返回 已经start的 SyncManager 对象的函数

BaseManager是用来管理共享对象的进程,通过对外开放代理 Proxy 使得其他进程在进程安全的情况下操作共享对象。

BaseManager 关键函数:

1
class multiprocessing.managers.BaseManager([address[, authkey]])  # address BaseManager进程运行的host的ip:port ; authkey: 密码

用于初始化 BaseManager 对象。
该方法需要在两个地方使用:

  1. Manager的初始化,初始化之后使用 start() 启动 manager
  2. 当有以个进程需要方位manager管理的对象时,需要使用该函数初始化 Manager,其 address 和 authkey 需要和被访问的 manager相同。初始化后使用 connect() 连接
1
start([initializer[, initargs]])

Start a subprocess to start the manager. If initializer is not None then the subprocess will call initializer(*initargs) when it starts.

1
get_server()

返回 manager 对象,manager 对象可以使用 serve_forever() 启动manager

1
connect()

连接 manager

1
shutdown()

Stop the process used by the manager. This is only available if start() has been used to start the server process.

1
register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])

用于注册代理 。注册一个获取manager所管理对象的proxy。具体使用方法见例子

typeid:用户获取被管理对象的proxy

callable是一个能够返回需要管理对象的函数

实例:使用一个master进程分发任务,slave进程用于处理任务并将任务返回。master和slave使用manager通信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import random, time
from Queue import Queue
from multiprocessing.managers import BaseManager

class Master:
def __init__(self):
# 派发出去的作业队列
self.dispatched_job_queue = Queue()
# 完成的作业队列
self.finished_job_queue = Queue()

def get_dispatched_job_queue(self):
return self.dispatched_job_queue

def get_finished_job_queue(self):
return self.finished_job_queue

def start(self, tasks):
# 把派发作业队列和完成作业队列注册到网络上
BaseManager.register('get_dispatched_job_queue', callable=self.get_dispatched_job_queue)
BaseManager.register('get_finished_job_queue', callable=self.get_finished_job_queue)

# 监听端口和启动服务
manager = BaseManager(address=('localhost', 58881), authkey='jobs')
manager.start()
print('manager started...')

# 使用上面注册的方法获取队列
dispatched_jobs = manager.get_dispatched_job_queue() # 实际上是一个 proxy
print(type(dispatched_jobs))
finished_jobs = manager.get_finished_job_queue()

for t in TASKS:
print(t)
dispatched_jobs.put(t)

while not dispatched_jobs.empty():
res = finished_jobs.get()
print(res)
dispatched_jobs.put('EXIT')
time.sleep(1)
manager.shutdown()

def mul(a, b):
time.sleep(0.5*random.random())
return a * b

def plus(a, b):
time.sleep(0.5*random.random())
return a + b

TASKS = [(mul, (i, 7)) for i in range(10)] + \
[(plus, (i, 8)) for i in range(10)]

if __name__ == "__main__":
master = Master()
master.start(TASKS)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import time, random
from Queue import Queue
from multiprocessing.managers import BaseManager

class Slave:

def __init__(self):
# 派发出去的作业队列
self.dispatched_job_queue = Queue()
# 完成的作业队列
self.finished_job_queue = Queue()

def start(self):
# 把派发作业队列和完成作业队列注册到网络上
BaseManager.register('get_dispatched_job_queue')
BaseManager.register('get_finished_job_queue')

# 连接master
server = 'localhost'
print('Connect to server %s...' % server)
manager = BaseManager(address=(server, 58881), authkey='jobs')
manager.connect()

# 使用上面注册的方法获取队列
dispatched_jobs = manager.get_dispatched_job_queue()
finished_jobs = manager.get_finished_job_queue()

# 运行作业并返回结果
while True:
func_args = dispatched_jobs.get()
if func_args == 'EXIT':
break
func = func_args[0]
args = func_args[1]
res = func(*args)
print(args, res)
finished_jobs.put(res)

# 用于slave和master有时候是在不同 host (或进程)中完成的,所有mul 和plus需要在master和slave间共享
# 实际应用中可以让master和slave共享 mul和plus 所在的模块的方法解决
def mul(a, b):
time.sleep(0.5*random.random())
return a * b

def plus(a, b):
time.sleep(0.5*random.random())
return a + b

if __name__ == "__main__":
slave = Slave()
slave.start()

进程池Pool

Pool

定义:

1
2
3
4
5
class multiprocessing.Pool([processes[, initializer[, initargs[, maxtasksperchild]]]])
processes: # 同时运行的进程(worker)数量
initializer: # 每个worker初始化函数
initargs: # initializer的参数
maxtasksperchild: # 每个worker执行这么多task后退出。因为在Pool结束前,所有的worker都是live状态,占用资源,限制该值后,让worker退出,从而释放一定的资源,我自己测试后发现在我的机器上没有什么作用

关键函数:

apply_async(func[, args[, kwds[, callback]]])
callback: 回调函数,当task执行结束以后,处理返回值。
用于传递不定参数,它是 非阻塞的且支持结果返回后进行回调。

map(func, iterable[, chunksize])
是内置函数map的多进程版本,在所有的task执行完成之前会阻塞主进程

map_async(func, iterable[, chunksize[, callback]])
是map的变体,它是 非阻塞的。但是如果传入了callback,当子进程结果ready以后,callback会执行并阻塞主进程(func是在子进程中异步执行的)。callback是在主进程中执行的。

imap(func, iterable[, chunksize])
itertools.imap()的多进程版本。返回IMapIterator迭代器对象

imap_unordered(func, iterable[, chunksize])
和imap功能相同,但是其结果是无序的。返回IMapUnorderedIterator 迭代器对象

close()
执行该函数后,pool中不能再加入新的task

terminate()
直接终止进程池中的所有task,如果有未执行结束的task,其结果将会丢失。

join()
等待所有task结束,阻塞主进程。执行join()之前必须先执行close()否则会出错。

AsyncResult& MapResult

1
class multiprocessing.pool.AsyncResult

Pool.apply_async() 返回结果保存在AsyncResult对象中,AsyncResult接收异步结果。

get([timeout])
获取进程执行的结果,如果结果没有available,则阻塞主线程并直到 result is available或者timeout。

wait([timeout])
等待进程返回结果。等待时阻塞主线程,直到 result is available或者timeout。

ready()
返回boolean值,表示进程是否已经返回结果

successful()
和read()作用相同,但是如果未ready则会产生异常

1
class MapResult(ApplyResult)

Pool.map_async()返回结果保存在MapResult对象中,MapResult继承自ApplyResult对外提供get、wait、ready和successful四个方法。MapResult接收异步结果。

其中get() 获取的是多个进程结果组成的list的对象。

IMapIterator&IMapUnorderedIterator

imap,imap_unordered返回的结果对象

next([timeout])

用于迭代获取下一个result

实例:

close() 、 terminate()、 join()的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import multiprocessing
import time
import random
import sys

def pow3(x):
time.sleep(0.01)
return x**3
if __name__ == "__main__":
# Create pool
PROCESSES = 4
print 'Creating pool with %d processes\n' % PROCESSES
pool = multiprocessing.Pool(PROCESSES)

print 'Testing close():'
results = [pool.apply_async(pow3, (i,)) for i in range(100)]
pool.close()
pool.join()
for r in results:
# print r.get()
assert r.get() is None
for worker in pool._pool: # pool._pool 是进程Process的对象集合 # type(worker) == <class 'multiprocessing.process.Process'>
assert not worker.is_alive()
print '\tclose() succeeded\n'

print 'Testing terminate():'
pool = multiprocessing.Pool(2)
DELTA = 0.1
ignore = pool.apply(pow3, [2])
results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
pool.terminate()
pool.join()
for worker in pool._pool:
assert not worker.is_alive()
print '\tterminate() succeeded\n'

apply_async、 map、 imap、 imap_unordered

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import multiprocessing
import time
import random
import sys

# Functions used by test code
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % (
multiprocessing.current_process().name,
func.__name__, args, result
)

def calculatestar(args):
return calculate(*args)

def mul(a, b):
time.sleep(0.5*random.random())
return a * b

def plus(a, b):
time.sleep(0.5*random.random())
return a + b

if __name__ == "__main__":
print 'cpu_count() = %d\n' % multiprocessing.cpu_count()
# Create pool
PROCESSES = 4
pool = multiprocessing.Pool(PROCESSES)

# Tasks
TASKS = [(mul, (i, 7)) for i in range(10)] + \
[(plus, (i, 8)) for i in range(10)]

results = [pool.apply_async(calculate, t) for t in TASKS]
print 'Ordered results using pool.apply_async():'
print type(results)
for r in results:
print type(r), '\t', r.get() # 每个结果都是 ApplyResult 对象
print
imap_it = pool.imap(calculatestar, TASKS)
print 'Ordered results using pool.imap():'
print type(imap_it)# IMapIterator 迭代器对象
for x in imap_it:
print '\t', x
print
imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
print 'Unordered results using pool.imap_unordered():'
print type(imap_unordered_it) # IMapUnorderedIterator 迭代器对象
for x in imap_unordered_it:
print '\t', x
print
print 'Ordered results using pool.map() --- will block till complete:'
map_it = pool.map(calculatestar, TASKS)
print type(map_it) # 由于Map是阻塞的,所有返回结果的 list
for x in map_it:
print '\t', x
print

imap、 imap_unordered结果迭代器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import multiprocessing
import time
import random
import sys

# Functions used by test code
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % (
multiprocessing.current_process().name,
func.__name__, args, result
)

def calculatestar(args):
return calculate(*args)

def mul(a, b):
time.sleep(0.5*random.random())
return a * b

def plus(a, b):
time.sleep(0.5*random.random())
return a + b

if __name__ == "__main__":
PROCESSES = 4
print 'Creating pool with %d processes\n' % PROCESSES
pool = multiprocessing.Pool(PROCESSES)

# Tasks
TASKS = [(mul, (i, 7)) for i in range(10)] + \
[(plus, (i, 8)) for i in range(10)]

print 'Testing IMapIterator.next() with timeout:',
it = pool.imap(calculatestar, TASKS)
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % it.next(0.01))
except StopIteration:
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')

print 'Testing IMapUnorderedIterator.next() with timeout:',
it = pool.imap_unordered(calculatestar, TASKS)
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % it.next(0.01))
except StopIteration:
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')

回调函数的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import multiprocessing
import time
import random
import sys

# Functions used by test code
def mul(a, b):
time.sleep(0.5*random.random())
return a * b

def pow3(x):
return x**3

if __name__ == "__main__":
# Create pool
PROCESSES = 4
print 'Creating pool with %d processes\n' % PROCESSES
pool = multiprocessing.Pool(PROCESSES)
print 'pool = %s' % pool

# Testing callback
A = []
B = [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729]

# wait
r = pool.apply_async(mul, (7, 8), callback=A.append) #
r.wait() # 使用wait等待子进程的结果ready。
print 'try1:', A
r = pool.map_async(pow3, range(10), callback=A.extend)
r.wait()
print 'map_async result', r.get()
print 'try2:', A

print 'no wait ========='
A = []

## no wait
r = pool.apply_async(mul, (7, 8), callback=A.append) #
print 'try1:', A
r = pool.map_async(pow3, range(10), callback=A.extend)
print 'try2:', A
for i in xrange(5):
print A
time.sleep(1)

杂项

  • multiprocessing.active_children()

    Return list of all live children of the current process.Calling this has the side effect of “joining” any processes which have already finished.

  • multiprocessing.cpu_count()

    Return the number of CPUs in the system. May raise NotImplementedError.

  • multiprocessing.current_process()
    Return the Process object corresponding to the current process.

参考资料

multiprocessing— Process-based “threading” interface

使用进程共享实现机器之间通信

https://www.cnblogs.com/sherlockhomles/p/8421075.html