multiprocessing—进程
提供了本地和远程并发操作,通过使用子进程而非线程有效地绕过了 全局解释器锁。引入和pool对象(进程池),提供了一种快捷的方法,赋予函数并行化处理一系列输入值的能力,可以将输入数据分配给不同进程处理(数据并行)。
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
>>>[1,4,9]
1、类
1)Process类:通过创建一个 Process 对象然后调用它的 start()
方法来生成进程。
from multiprocessing import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
def f(name):
info('function f')
print('hello', name)
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()
Process(group=None,target=None,name=None,args=(),kwargs={},*,daemon=None)
应始终使用关键字参数调用构造函数。 group 应该始终是 None
;它仅用于兼容 threading.Thread 。 target 是由 run() 方法调用的可调用对象。它默认为 None
,意味着什么都没有被调用。 name 是进程名称(有关详细信息,请参阅 name )。 args 是目标调用的参数元组。 kwargs 是目标调用的关键字参数字典。如果提供,则键参数 daemon 将进程 daemon 标志设置为 True
或 False
。如果是 None
(默认值),则该标志将从创建的进程继承。
方法:
1)run():表示进程活动的方法。
2)start():启动进程活动。
3)join(timeout):如果timeout是None,该方法将阻塞,知道调用join()方法的进程终止,如果timeout是正数,最多阻塞timeout秒,如果进程终止或方法超时,该方法返回None。一个进程可以join多次。无法join自己,会导致死锁,
4)name:进程名称
5)is_alive():返回进程是否存活。
6)daemon:进程的守护标志,一个bool值。
7)pid:返回进程ID。
8)exitcode:退出状态码。
9)authkey:进程的身份验证秘钥
10)sentinel:系统对象的数字句柄,当进程结束时变为ready
11)terminate():终止进程。
12)kill():终止进程。
13)close():关闭Process对象。
>>> import multiprocessing, time, signal
>>> p = multiprocessing.Process(target=time.sleep, args=(1000,))
>>> print(p, p.is_alive())
<Process ... initial> False
>>> p.start()
>>> print(p, p.is_alive())
<Process ... started> True
>>> p.terminate()
>>> time.sleep(0.1)
>>> print(p, p.is_alive())
<Process ... stopped exitcode=-SIGTERM> False
>>> p.exitcode == -signal.SIGTERM
True
2)管道和队列:
Pipe(duplex):返回一对Connection对象,分别表示管道两端。如果duplex被置为TRUE,那么该管道是双向的,如果是FALSE,那么是单向的。
Queue(maxsize):返回一个使用一个管道和少量锁和信号量实现的共享队列实例。当一个进程将一个对象放进队列中,一个写如线程会启动并将对象从缓存区写入管道中。
qsize():返回队列长度。
empty():如果队列为空,返回TRUE,反之FALSE。
full():如果队列满的,返回TRUE,反之FALSE。
put(obj,block,timeout):将 obj 放入队列。如果可选参数 block 是 True
(默认值) 而且 timeout 是 None
(默认值), 将会阻塞当前进程,直到有空的缓冲槽。如果 timeout 是正数,将会在阻塞了最多 timeout 秒之后还是没有可用的缓冲槽时抛出 queue.Full 异常。反之 (block 是 False
时),仅当有可用缓冲槽时才放入对象,否则抛出 queue.Full 异常 (在这种情形下 timeout 参数会被忽略)。
put_noawait(obj):相当于put(obj,FALSE)
get(block,timeout):从队列中取出并返回对象。如果可选参数 block 是 True
(默认值) 而且 timeout 是 None
(默认值), 将会阻塞当前进程,直到队列中出现可用的对象。如果 timeout 是正数,将会在阻塞了最多 timeout 秒之后还是没有可用的对象时抛出 queue.Empty 异常。反之 (block 是 False
时),仅当有可用对象能够取出时返回,否则抛出 queue.Empty 异常 (在这种情形下 timeout 参数会被忽略)。
get_noawait():相当于get(FALSE)
close():指示当前进程将不会再往队列中放入对象。一旦所有缓冲区中的数据被写入管道之后,后台的线程会退出。这个方法在队列被gc回收时会自动调用。
join_thread():等待后台线程。这个方法仅在调用了 close() 方法之后可用。这会阻塞当前进程,直到后台线程退出,确保所有缓冲区中的数据都被写入管道中。
cancel_join_thread():防止 join_thread() 方法阻塞当前进程。具体而言,这防止进程退出时自动等待后台线程退出。
SimpleQueue:一个简化的Queue类的实现。
close():关闭队列,被关闭后不能不能在被使用。
empty():队列为空返回TRUE,否则FALSE。
get():从队列中移出并返回一个对象。
put():将item放入队列。
JoinableQueue(maxsize):是Queue的子类。
task_done():指出之前进入队列的任务已经完成。由队列的消费者进程使用。对于每次调用 get() 获取的任务,执行完成后调用 task_done() 告诉队列该任务已经处理完成。
如果 join() 方法正在阻塞之中,该方法会在所有对象都被处理完的时候返回 (即对之前使用 put() 放进队列中的所有对象都已经返回了对应的 task_done() ) 。
如果被调用的次数多于放入队列中的项目数量,将引发 ValueError 异常 。
join():阻塞至队列中所有的元素都被接收和处理完毕。
当条目添加到队列的时候,未完成任务的计数就会增加。每当消费者进程调用 task_done() 表示这个条目已经被回收,该条目所有工作已经完成,未完成计数就会减少。当未完成计数降到零的时候, join()
阻塞被解除。
3)杂项:
active_children():返回当前进程存活的子进程列表。
cpu_count():返回系统CPU数量。
current_process():返回与当前进程相对的process对象。
parent_process():返回父进程process对象,和父进程调用 current_process() 返回的对象一样。如果一个进程已经是主进程, parent_process
会返回 None
.
free_support():冻结可执行文件。
get_all_start_methods():返回支持的启动方法列表,该列表的首项为默认选项。
get_context(method=None):返回一个Context对象,如果 method 设置成 None
那么将返回默认上下文对象。
get_start_method(allow_none=FALSE):返回启动进程时使用的方法名。
set_executable(executable):设置启动子进程时要使用的 Python 解释器的路径。
4)连接对象:
connection.Connection:允许收发可以序列化的对象或字符串,可以看做面向消息的连接套接字。
1)send(obj):将一个对象发送到连接的另一端。
2)recv():返回一个由另一端使用send(obj)发送的对象。
3)fileno():返回由连接对象使用的描述符或句柄。
4)close():关闭连接对象。
5)poll(timeout):返回连接对象中是否有可以读取的数据。如果未指定 timeout ,此方法会马上返回。如果 timeout 是一个数字,则指定了最大阻塞的秒数。如果 timeout 是 None
,那么将一直等待,不会超时。
6)send_bytes(buffer,offset,size):从一个字节类对象中取出字节数组并作为一条完整的消息发送。如果由 offset 给定了在 buffer 中读取数据的位置。 如果给定了 size ,那么将会从缓冲区中读取多个字节。
7)recv_bytes(maxlength):以字符串形式返回一条从连接对象另一端发送过来的字节数据,如果设置了maxlength并且消息长度超过maxlength的值,将抛出异常且连接对象不在可读。
8)recv_bytes_into(buffer,offset):将一条完整的字节数据消息读入buffer并返回消息的字节数。
>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
5)同步原语:
Barrier(parties,action,timeout):类似于线程的栅栏对象。
BoundedSemaphore(value):类似于线程的有界信号对象。
Condition(lock):条件变量,线程的别名。
Event:类似线程的事件。
Lock:原始锁对象,一旦一个进程或者线程拿到了锁,后续的任何其他进程或线程的其他请求都会被阻塞直到锁被释放。任何进程或线程都可以释放锁。
1)acquire(block=True,timeout=None):获得锁,阻塞或非阻塞的。
2)release():释放锁,可以在任何进程、线程使用,并不限于锁的拥有者。
Rlock:递归锁对象,递归锁必须由持有线程、进程亲自释放。如果某个进程或者线程拿到了递归锁,这个进程或者线程可以再次拿到这个锁而不需要等待。但是这个进程或者线程的拿锁操作和释放锁操作的次数必须相同。
1)acquire(block=True,timeout=None):获得锁,阻塞或非阻塞的。
2)release():释放锁,锁内的递归等级减一。
Semaphore(value):信号对象。
6)共享ctypes对象:
Value(typecode_or_type,*args,lock=True)
返回一个从共享内存上创建的 ctypes 对象。默认情况下返回的对象实际上是经过了同步器包装过的。可以通过 Value 的 value 属性访问这个对象本身。
typecode_or_type 指明了返回的对象类型: 它可能是一个 ctypes 类型或者 array 模块中每个类型对应的单字符长度的字符串。 *args 会透传给这个类的构造函数。
如果 lock 参数是 True
(默认值), 将会新建一个递归锁用于同步对于此值的访问操作。 如果 lock 是 Lock 或者 RLock 对象,那么这个传入的锁将会用于同步对这个值的访问操作,如果 lock 是 False
, 那么对这个对象的访问将没有锁保护,也就是说这个变量不是进程安全的。
Array(typecode_or_type,size_or_initializer,*,lock=True)
从共享内存中申请并返回一个具有ctypes类型的数组对象。默认情况下返回值实际上是被同步器包装过的数组对象。
typecode_or_type 指明了返回的数组中的元素类型: 它可能是一个 ctypes 类型或者 array 模块中每个类型对应的单字符长度的字符串。 如果 size_or_initializer 是一个整数,那就会当做数组的长度,并且整个数组的内存会初始化为0。否则,如果 size_or_initializer 会被当成一个序列用于初始化数组中的每一个元素,并且会根据元素个数自动判断数组的长度。
如果 lock 为 True
(默认值) 则将创建一个新的锁对象用于同步对值的访问。 如果 lock 为一个 Lock 或 RLock 对象则该对象将被用于同步对值的访问。 如果 lock 为 False
则对返回对象的访问将不会自动得到锁的保护,也就是说它不是“进程安全的”。
请注意 lock 是一个仅限关键字参数。
sharedctypes.RawArray(typecode_or_type,size_or_initializer):
从共享内存中申请并返回一个 ctypes 数组。
typecode_or_type 指明了返回的数组中的元素类型: 它可能是一个 ctypes 类型或者 array 模块中使用的类型字符。 如果 size_or_initializer 是一个整数,那就会当做数组的长度,并且整个数组的内存会初始化为0。否则,如果 size_or_initializer 会被当成一个序列用于初始化数组中的每一个元素,并且会根据元素个数自动判断数组的长度。
sharedctypes.RawValue(typecode_or_type,*args):
从共享内存中申请并返回一个 ctypes 对象。
typecode_or_type 指明了返回的对象类型: 它可能是一个 ctypes 类型或者 array 模块中每个类型对应的单字符长度的字符串。 *args 会透传给这个类的构造函数。
sharedctypes.Array(typecode_or_type,size_or_initializer,*,lock=True):
返回一个纯 ctypes 数组, 或者在此之上经过同步器包装过的进程安全的对象,这取决于 lock 参数的值,除此之外,和 RawArray() 一样。
如果 lock 为 True
(默认值) 则将创建一个新的锁对象用于同步对值的访问。 如果 lock 为一个 Lock
或 RLock
对象则该对象将被用于同步对值的访问。 如果 lock 为 False
则对返回对象的访问将不会自动得到锁的保护,也就是说它不是“进程安全的”。
注意 lock 只能是命名参数。
sharedctypes.Value(typecode_or_type,*args,lock=True):
返回一个纯 ctypes 数组, 或者在此之上经过同步器包装过的进程安全的对象,这取决于 lock 参数的值,除此之外,和 RawArray()
一样。
如果 lock 为 True
(默认值) 则将创建一个新的锁对象用于同步对值的访问。 如果 lock 为一个 Lock
或 RLock
对象则该对象将被用于同步对值的访问。 如果 lock 为 False
则对返回对象的访问将不会自动得到锁的保护,也就是说它不是“进程安全的”。
注意 lock 只能是命名参数。
sharedctypes.copy(obj):
从共享内存中申请一片空间将 ctypes 对象 obj 过来,然后返回一个新的 ctypes 对象。
sharedctypes.synchronized(obj,lock):
将一个 ctypes 对象包装为进程安全的对象并返回,使用 lock 同步对于它的操作。如果 lock 是 None
(默认值) ,则会自动创建一个 multiprocessing.RLock 对象。
同步器包装后的对象会在原有对象基础上额外增加两个方法: get_obj()
返回被包装的对象, get_lock()
返回内部用于同步的锁。
7、管理器:
Manager():返回一个已启动的管理器对象,这个对象可以用于在不同进程中共享数据。返回的管理器对象对应了一个已启动的子进程,并且拥有一系列方法有用于创建共享对象,返回对应的代理。当管理器被垃圾回收或者父进程退出时,管理器进程会立即退出。
manager.BaseManager(address,authkey):创建一个对象。
address是管理器服务进程监听的地址,如果address是None,允许和任意主机的请求简历连接。
authkey是认证标识,用于检查连接服务进程的请求合法性。
start(initializer,initargs):为管理器开启一个子进程。
get_server():返回一个server对象,它是管理器在后台控制的真实的服务。 Server
对象拥有 serve_forever()
方法。
connect():将本地管理器对象连接到一个远程管理器进程。
shutdown():停止管理器进程,
register(typeid,callable,proxytype,exposed,method_to_typeid,create_method):
一个 classmethod,可以将一个类型或者可调用对象注册到管理器类。
typeid 是一种 “类型标识符”,用于唯一表示某种共享对象类型,必须是一个字符串。
callable 是一个用来为此类型标识符创建对象的可调用对象。如果一个管理器实例将使用 connect() 方法连接到服务器,或者 create_method 参数为 False
,那么这里可留下 None
。
proxytype 是 BaseProxy 的子类,可以根据 typeid 为共享对象创建一个代理,如果是 None
, 则会自动创建一个代理类。
exposed 是一个函数名组成的序列,用来指明只有这些方法可以使用 BaseProxy._callmethod() 代理。(如果 exposed 是 None
, 则会在 proxytype._exposed_
存在的情况下转而使用它) 当暴露的方法列表没有指定的时候,共享对象的所有 “公共方法” 都会被代理。(这里的“公共方法”是指所有拥有 __call__() 方法并且不是以 '_'
开头的属性)
method_to_typeid 是一个映射,用来指定那些应该返回代理对象的暴露方法所返回的类型。(如果 method_to_typeid 是 None
, 则 proxytype._method_to_typeid_
会在存在的情况下被使用)如果方法名称不在这个映射中或者映射是 None
,则方法返回的对象会是一个值拷贝。
create_method 指明,是否要创建一个以 typeid 命名并返回一个代理对象的方法,这个函数会被服务进程用于创建共享对象,默认为 True
。
address:管理器所用地址。
managers.SyncManager:
Barrier(parties,action,timeout):创建一个共享的threading.Barrier对象并返回它的代理。
BoundedSemaphore(value):创建一个共享的threading.BoundedSemaphore对象并返回它的代理。
Coundition(lock):创建一个共享的 threading.Condition 对象并返回它的代理。
Event():创建一个共享的 threading.Event 对象并返回它的代理。
Lock():创建一个共享的 threading.Lock 对象并返回它的代理。
Namespace():创建一个共享的 Namespace 对象并返回它的代理。
Queue(maxsize):创建一个共享的 queue.Queue 对象并返回它的代理。
RLock():创建一个共享的 threading.RLock 对象并返回它的代理。
Semaphore(value):创建一个共享的 threading.Semaphore 对象并返回它的代理。
Array(typecode,sequence):创建一个数组并返回它的代理。
Value(typecode,value):创建一个具有可写 value
属性的对象并返回它的代理。
dict():创建一个共享的 dict 对象并返回它的代理。
list():创建一个共享的 list 对象并返回它的代理。
managers.Namespace:
一个可以注册到SyncManager的类型。
8)进程池:
pool.Pool(process,initializer,initargs,maxtasksperchild,context):
一个进程池对象,它控制可以提交作业的工作进程池。支持带有超时和回调的异步结果,以及一个并行的map实现。
process:要是用的工作进程数目。
initializer:如果不为None,则每个工作进程将在启动时调用initializer(*initargs)
maxtasksperchild :一个工作进程在它退出或被一个新的工作进程代替之前能完成的任务数量,为了释放未使用的资源。默认的 maxtasksperchild 是 None
,意味着工作进程寿与池齐。
context:可被用于指定启动的工作进程的上下文。
方法:
1)apply(func,args,kwds):使用 args 参数以及 kwds 命名参数调用 func , 它会返回结果前阻塞。 func 只会在一个进程池中的一个工作进程中执行。
2)apply_async(func,args,kwds,callback,error_back):apply() 方法的一个变种,返回一个 AsyncResult 对象。
3)map(func,iterable,chunksize):内置 map() 函数的并行版本 但它只支持一个 iterable 参数。
4)map_async(func,iterable,chunksize,error_callback):map() 方法的一个变种,返回一个 AsyncResult 对象。
5)imap(func,iterable,chunksize):map() 的延迟执行版本。
6)imap_unordered(func,iterable,chunksize):和 imap() 相同,只不过通过迭代器返回的结果是任意的。(当进程池中只有一个工作进程的时候,返回结果的顺序才能认为是”有序”的)。
7)starmap(func,iterable,chunksize):与 map() 类似,不同之处在于可迭代对象的元素应该是作为参数解压缩的可迭代对象。
8)starmap_async(func,iterable,chunksize,callback,error_callback):相当于 starmap() 与 map_async() 的结合,迭代 iterable 的每一项,解包作为 func 的参数并执行,返回用于获取结果的对象。
9)close():阻止后续任务提交到进程池,当所有任务执行完成后,工作进程会退出。
10)terminate():不必等待未完成的任务,立即停止工作进程。当进程池对象被垃圾回收时,会立即调用 terminate()。
11)join():等待工作进程结束。调用 join() 前必须先调用 close() 或者 terminate() 。
pool.AsyncResult:Pool.apply_async() 和 Pool.map_async() 返回对象所属的类。
方法:
1)get(timeout):获取执行结果。
2)wait(timeout):阻塞,直到返回结果。
3)ready():返回执行状态,是否完成。
4)successful():判断调用是否已经完成并且未引发异常。
9、监听器及客户端:
connection.deliver_challenge(connection,authkey):发送一个随机生成的消息到另一端,并等待回复。
connection.answer_challenge(connection,authkey):接收一条消息,使用authkey作为键计算信息的摘要,然后将摘要发送回去。
connection.Client(address,family,authkey):使用地址上的监听器简历一个连接。
connection.Listener(address,family,backlog,authkey):监听连接请求。
1)accept:接受一个连接并返回一个Connection对象。
2)close():关闭监听器对象上的绑定套接字或者命名管道。此函数会在监听器被垃圾回收后自动调用。不过仍然建议显式调用函数关闭。
3)address:监听器对象使用的地址。
4)last_accepted:最后一个连接所使用的地址。如果没有的话就是 None
。
connection.wait(objectlist,timeout=None):一直等待直到 object_list 中某个对象处于就绪状态。