Código fuente: Lib / multiprocesamiento /
Introducción
multiprocessing
es un paquete que admite procesos de generación mediante una API similar a la threading
módulo. los multiprocessing
El paquete ofrece simultaneidad local y remota, evitando de manera efectiva el Bloqueo de intérprete global mediante el uso de subprocesos en lugar de subprocesos. Debido a esto, el multiprocessing
permite al programador aprovechar completamente varios procesadores en una máquina determinada. Funciona tanto en Unix como en Windows.
los multiprocessing
El módulo también presenta API que no tienen análogos en el threading
módulo. Un buen ejemplo de esto es el Pool
objeto que ofrece un medio conveniente para paralelizar la ejecución de una función a través de múltiples valores de entrada, distribuyendo los datos de entrada entre procesos (paralelismo de datos). El siguiente ejemplo demuestra la práctica común de definir tales funciones en un módulo para que los procesos secundarios puedan importar ese módulo con éxito. Este ejemplo básico de paralelismo de datos usando Pool
,
from multiprocessing import Pool def f(x): return x*x if __name__ == '__main__': with Pool(5) as p: print(p.map(f, [1, 2, 3]))
imprimirá en salida estándar
[1, 4, 9]
los Process
clase
En multiprocessing
, los procesos se generan creando un Process
objeto y luego llamar a su start()
método. Process
sigue la API de threading.Thread
. Un ejemplo trivial de un programa multiproceso es
from multiprocessing import Process def f(name): print('hello', name) if __name__ == '__main__': p = Process(target=f, args=('bob',)) p.start() p.join()
Para mostrar los ID de proceso individuales involucrados, aquí hay un ejemplo ampliado:
from multiprocessing import Process import os def info(title): print(title) print('module name:', __name__) print('parent process:', os.getppid()) print('process id:', os.getpid()) def f(name): info('function f') print('hello', name) if __name__ == '__main__': info('main line') p = Process(target=f, args=('bob',)) p.start() p.join()
Para obtener una explicación de por qué if __name__ == '__main__'
parte es necesaria, ver Pautas de programación.
Contextos y métodos de inicio
Dependiendo de la plataforma, multiprocessing
admite tres formas de iniciar un proceso. Estas métodos de inicio están
- Aparecer
-
El proceso padre inicia un nuevo proceso de intérprete de Python. El proceso hijo solo heredará los recursos necesarios para ejecutar el proceso del objeto
run()
método. En particular, no se heredarán los descriptores de archivo y los identificadores innecesarios del proceso principal. Iniciar un proceso con este método es bastante lento en comparación con usar tenedor o tenedor.Disponible en Unix y Windows. El predeterminado en Windows y macOS.
- tenedor
-
El proceso padre usa
os.fork()
para bifurcar el intérprete de Python. El proceso hijo, cuando comienza, es efectivamente idéntico al proceso padre. Todos los recursos del padre son heredados por el proceso hijo. Tenga en cuenta que la bifurcación segura de un proceso multiproceso es problemática.Disponible solo en Unix. El predeterminado en Unix.
- tenedor
-
Cuando el programa se inicia y selecciona el tenedor método de inicio, se inicia un proceso de servidor. A partir de ese momento, cada vez que se necesita un nuevo proceso, el proceso padre se conecta al servidor y solicita que bifurque un nuevo proceso. El proceso del servidor de bifurcación es de un solo subproceso, por lo que es seguro de usar
os.fork()
. No se heredan recursos innecesarios.Disponible en plataformas Unix que admiten el paso de descriptores de archivos a través de tuberías Unix.
Modificado en la versión 3.8: En macOS, el Aparecer El método de inicio ahora es el predeterminado. los tenedor El método de inicio debe considerarse inseguro, ya que puede provocar fallas en el subproceso. Ver bpo-33725.
Modificado en la versión 3.4: Aparecer agregado en todas las plataformas Unix, y tenedor agregado para algunas plataformas Unix. Los procesos secundarios ya no heredan todos los identificadores heredables principales en Windows.
En Unix usando el Aparecer o tenedor Los métodos de inicio también iniciarán un rastreador de recursos proceso que rastrea los recursos del sistema con nombre no vinculados (como semáforos con nombre o SharedMemory
objetos) creados por procesos del programa. Cuando todos los procesos han salido, el rastreador de recursos desvincula cualquier objeto rastreado restante. Por lo general, no debería haber ninguno, pero si una señal mata un proceso, puede haber algunos recursos “filtrados”. (Ni los semáforos filtrados ni los segmentos de memoria compartida se desvincularán automáticamente hasta el próximo reinicio. Esto es problemático para ambos objetos porque el sistema permite solo un número limitado de semáforos con nombre y los segmentos de memoria compartida ocupan algo de espacio en la memoria principal).
Para seleccionar un método de inicio, utilice el set_start_method()
en el if __name__ == '__main__'
cláusula del módulo principal. Por ejemplo:
import multiprocessing as mp def foo(q): q.put('hello') if __name__ == '__main__': mp.set_start_method('spawn') q = mp.Queue() p = mp.Process(target=foo, args=(q,)) p.start() print(q.get()) p.join()
set_start_method()
no debe usarse más de una vez en el programa.
Alternativamente, puede usar get_context()
para obtener un objeto de contexto. Los objetos de contexto tienen la misma API que el módulo de multiprocesamiento y permiten utilizar varios métodos de inicio en el mismo programa.
import multiprocessing as mp def foo(q): q.put('hello') if __name__ == '__main__': ctx = mp.get_context('spawn') q = ctx.Queue() p = ctx.Process(target=foo, args=(q,)) p.start() print(q.get()) p.join()
Tenga en cuenta que los objetos relacionados con un contexto pueden no ser compatibles con los procesos de un contexto diferente. En particular, las cerraduras creadas con el tenedor El contexto no se puede pasar a los procesos iniciados utilizando el Aparecer o tenedor métodos de inicio.
Una biblioteca que quiera usar un método de inicio particular probablemente debería usar get_context()
para evitar interferir con la elección del usuario de la biblioteca.
Advertencia
los 'spawn'
y 'forkserver'
Los métodos de inicio no se pueden utilizar actualmente con ejecutables “congelados” (es decir, binarios producidos por paquetes como PyInstaller y cx_Freeze) en Unix. los 'fork'
El método de inicio funciona.
Intercambio de objetos entre procesos
multiprocessing
admite dos tipos de canales de comunicación entre procesos:
Colas
los Queue
la clase es casi un clon de queue.Queue
. Por ejemplo:
from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()
Las colas son seguras para procesos y subprocesos.
Tubería
los Pipe()
La función devuelve un par de objetos de conexión conectados por una tubería que por defecto es dúplex (bidireccional). Por ejemplo:
from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join()
Los dos objetos de conexión devueltos por Pipe()
representan los dos extremos de la tubería. Cada objeto de conexión tiene send()
y recv()
métodos (entre otros). Tenga en cuenta que los datos de una tubería pueden dañarse si dos procesos (o subprocesos) intentan leer o escribir en el mismo extremo de la tubería al mismo tiempo. Por supuesto, no hay riesgo de corrupción de procesos que utilizan diferentes extremos de la tubería al mismo tiempo.
Sincronización entre procesos
multiprocessing
contiene equivalentes de todas las primitivas de sincronización de threading
. Por ejemplo, se puede usar un candado para asegurarse de que solo un proceso se imprima en la salida estándar a la vez:
from multiprocessing import Process, Lock def f(l, i): l.acquire() try: print('hello world', i) finally: l.release() if __name__ == '__main__': lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start()
Sin utilizar la salida de bloqueo de los diferentes procesos, es probable que se confundan todos.
Compartiendo estado entre procesos
Como se mencionó anteriormente, cuando se realiza una programación simultánea, generalmente es mejor evitar el uso del estado compartido en la medida de lo posible. Esto es particularmente cierto cuando se utilizan varios procesos.
Sin embargo, si realmente necesita utilizar algunos datos compartidos, multiprocessing
proporciona un par de formas de hacerlo.
Memoria compartida
Los datos se pueden almacenar en un mapa de memoria compartida usando Value
o Array
. Por ejemplo, el siguiente código
from multiprocessing import Process, Value, Array def f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i] if __name__ == '__main__': num = Value('d', 0.0) arr = Array('i', range(10)) p = Process(target=f, args=(num, arr)) p.start() p.join() print(num.value) print(arr[:])
imprimirá
3.1415927 [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
los 'd'
y 'i'
argumentos utilizados al crear num
y arr
son códigos de tipo del tipo utilizado por el array
módulo: 'd'
indica un flotador de doble precisión y 'i'
indica un entero con signo. Estos objetos compartidos serán seguros para procesos y subprocesos.
Para mayor flexibilidad en el uso de la memoria compartida, se puede utilizar el multiprocessing.sharedctypes
módulo que admite la creación de objetos ctypes arbitrarios asignados desde la memoria compartida.
Proceso del servidor
Un objeto de administrador devuelto por Manager()
controla un proceso de servidor que contiene objetos Python y permite que otros procesos los manipulen usando proxies.
Un gerente regresó por Manager()
apoyará tipos list
, dict
, Namespace
, Lock
, RLock
, Semaphore
, BoundedSemaphore
, Condition
, Event
, Barrier
, Queue
, Value
y Array
. Por ejemplo,
from multiprocessing import Process, Manager def f(d, l): d[1] = '1' d['2'] = 2 d[0.25] = None l.reverse() if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(10)) p = Process(target=f, args=(d, l)) p.start() p.join() print(d) print(l)
imprimirá
{0.25: None, 1: '1', '2': 2} [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
Los administradores de procesos de servidor son más flexibles que el uso de objetos de memoria compartida porque se pueden hacer para admitir tipos de objetos arbitrarios. Además, los procesos de diferentes computadoras en una red pueden compartir un solo administrador. Sin embargo, son más lentos que el uso de memoria compartida.
Usar un grupo de trabajadores
los Pool
la clase representa un grupo de procesos de trabajo. Tiene métodos que permiten que las tareas se descarguen a los procesos de trabajo de diferentes formas.
Por ejemplo:
from multiprocessing import Pool, TimeoutError import time import os def f(x): return x*x if __name__ == '__main__': # start 4 worker processes with Pool(processes=4) as pool: # print "[0, 1, 4,..., 81]" print(pool.map(f, range(10))) # print same numbers in arbitrary order for i in pool.imap_unordered(f, range(10)): print(i) # evaluate "f(20)" asynchronously res = pool.apply_async(f, (20,)) # runs in *only* one process print(res.get(timeout=1)) # prints "400" # evaluate "os.getpid()" asynchronously res = pool.apply_async(os.getpid, ()) # runs in *only* one process print(res.get(timeout=1)) # prints the PID of that process # launching multiple evaluations asynchronously *may* use more processes multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)] print([res.get(timeout=1) for res in multiple_results]) # make a single worker sleep for 10 secs res = pool.apply_async(time.sleep, (10,)) try: print(res.get(timeout=1)) except TimeoutError: print("We lacked patience and got a multiprocessing.TimeoutError") print("For the moment, the pool remains available for more work") # exiting the 'with'-block has stopped the pool print("Now the pool is closed and no longer available")
Tenga en cuenta que los métodos de un grupo solo deben ser utilizados por el proceso que lo creó.
Nota
La funcionalidad dentro de este paquete requiere que el __main__
que los niños puedan importar el módulo. Esto está cubierto en Pautas de programación sin embargo, vale la pena señalarlo aquí. Esto significa que algunos ejemplos, como el multiprocessing.pool.Pool
los ejemplos no funcionarán en el intérprete interactivo. Por ejemplo:
>>> from multiprocessing import Pool >>> p = Pool(5) >>> def f(x): ... return x*x ... >>> with p: ... p.map(f, [1,2,3]) Process PoolWorker-1: Process PoolWorker-2: Process PoolWorker-3: Traceback (most recent call last): AttributeError: 'module' object has no attribute 'f' AttributeError: 'module' object has no attribute 'f' AttributeError: 'module' object has no attribute 'f'
(Si intenta esto, en realidad generará tres trazas completas intercaladas de manera semi-aleatoria, y luego es posible que deba detener el proceso principal de alguna manera).
Referencia
los multiprocessing
paquete en su mayoría replica la API del threading
módulo.
Process
y excepciones
-
class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
-
Los objetos de proceso representan la actividad que se ejecuta en un proceso separado. los
Process
clase tiene equivalentes de todos los métodos dethreading.Thread
.El constructor siempre debe llamarse con argumentos de palabras clave. grupo siempre debe ser
None
; existe únicamente por compatibilidad conthreading.Thread
. objetivo es el objeto invocable que será invocado por elrun()
método. Por defecto esNone
, lo que significa que no se llama nada. nombre es el nombre del proceso (vername
para más detalles). argumentos es la tupla de argumentos para la invocación de destino. kwargs es un diccionario de argumentos de palabras clave para la invocación de destino. Si se proporciona, la palabra clave demonio argumento establece el procesodaemon
bandera aTrue
oFalse
. SiNone
(el valor predeterminado), esta bandera se heredará del proceso de creación.De forma predeterminada, no se pasan argumentos a objetivo.
Si una subclase anula el constructor, debe asegurarse de que invoca al constructor de la clase base (
Process.__init__()
) antes de hacer cualquier otra cosa en el proceso.Modificado en la versión 3.3: Agregó el demonio argumento.
-
run()
-
Método que representa la actividad del proceso.
Puede anular este método en una subclase. El estandar
run()
El método invoca el objeto invocable pasado al constructor del objeto como argumento de destino, si lo hay, con argumentos secuenciales y de palabra clave tomados del argumentos y kwargs argumentos, respectivamente.
-
start()
-
Inicie la actividad del proceso.
Esto se debe llamar como máximo una vez por objeto de proceso. Organiza el objeto
run()
método para ser invocado en un proceso separado.
-
join([timeout])
-
Si el argumento opcional se acabó el tiempo es
None
(el predeterminado), el método se bloquea hasta que el proceso cuyojoin()
se llama al método termina. Si se acabó el tiempo es un número positivo, bloquea como máximo se acabó el tiempo segundos. Tenga en cuenta que el método devuelveNone
si su proceso termina o si el método se agota. Verifique el procesoexitcode
para determinar si terminó.Un proceso se puede unir muchas veces.
Un proceso no puede unirse a sí mismo porque esto provocaría un interbloqueo. Es un error intentar unirse a un proceso antes de que se haya iniciado.
-
name
-
El nombre del proceso. El nombre es una cadena que se utiliza únicamente con fines de identificación. No tiene semántica. A varios procesos se les puede dar el mismo nombre.
El nombre inicial lo establece el constructor. Si no se proporciona un nombre explícito al constructor, un nombre de la forma ‘Proceso-N1:NORTE2:…:NORTEk‘se construye, donde cada Nk es el enésimo hijo de su padre.
-
is_alive()
-
Devuelve si el proceso está vivo.
Aproximadamente, un objeto de proceso está vivo desde el momento en que
start()
El método regresa hasta que termina el proceso hijo.
-
daemon
-
La bandera del demonio del proceso, un valor booleano. Esto debe establecerse antes
start()
se llama.El valor inicial se hereda del proceso de creación.
Cuando un proceso sale, intenta terminar todos sus procesos secundarios demoníacos.
Tenga en cuenta que un proceso demoníaco no puede crear procesos secundarios. De lo contrario, un proceso demoníaco dejaría huérfanos a sus hijos si se termina cuando sale su proceso padre. Además, estos son no Demonios o servicios Unix, son procesos normales que se terminarán (y no se unirán) si los procesos no demoníacos han salido.
Además de
threading.Thread
API,Process
Los objetos también admiten los siguientes atributos y métodos:-
pid
-
Devuelve el ID del proceso. Antes de que se genere el proceso, esto será
None
.
-
exitcode
-
El código de salida del niño. Esto será
None
si el proceso aún no ha terminado. Un valor negativo -NORTE indica que el niño fue despedido por señal norte.
-
authkey
-
La clave de autenticación del proceso (una cadena de bytes).
Cuando
multiprocessing
se inicializa al proceso principal se le asigna una cadena aleatoria usandoos.urandom()
.Cuando una
Process
se crea el objeto, heredará la clave de autenticación de su proceso padre, aunque esto se puede cambiar estableciendoauthkey
a otra cadena de bytes.Ver Claves de autenticación.
-
sentinel
-
Un identificador numérico de un objeto del sistema que estará “listo” cuando finalice el proceso.
Puede usar este valor si desea esperar en varios eventos a la vez usando
multiprocessing.connection.wait()
. De lo contrario llamandojoin()
es más simple.En Windows, este es un identificador de sistema operativo que se puede usar con el
WaitForSingleObject
yWaitForMultipleObjects
familia de llamadas API. En Unix, este es un descriptor de archivo que se puede utilizar con primitivas delselect
módulo.Nuevo en la versión 3.3.
-
terminate()
-
Termina el proceso. En Unix esto se hace usando el
SIGTERM
señal; en WindowsTerminateProcess()
se utiliza. Tenga en cuenta que los controladores de salida y las cláusulas finalmente, etc., no se ejecutarán.Tenga en cuenta que los procesos descendientes del proceso no terminar, simplemente quedarán huérfanos.
Advertencia
Si se usa este método cuando el proceso asociado está usando una tubería o una cola, entonces la tubería o la cola pueden corromperse y pueden volverse inutilizables por otro proceso. De manera similar, si el proceso ha adquirido un bloqueo o semáforo, etc., su terminación puede provocar que otros procesos se bloqueen.
-
kill()
-
Igual que
terminate()
pero usando elSIGKILL
señal en Unix.Nuevo en la versión 3.7.
-
close()
-
Cierra el
Process
objeto, liberando todos los recursos asociados con él.ValueError
se genera si el proceso subyacente aún se está ejecutando. Una vezclose()
devuelve con éxito, la mayoría de los otros métodos y atributos delProcess
el objeto se levantaráValueError
.Nuevo en la versión 3.7.
Tenga en cuenta que el
start()
,join()
,is_alive()
,terminate()
yexitcode
Los métodos solo deben ser llamados por el proceso que creó el objeto de proceso.Ejemplo de uso de algunos de los métodos de
Process
:>>> import multiprocessing, time, signal >>> p = multiprocessing.Process(target=time.sleep, args=(1000,)) >>> print(p, p.is_alive()) <Process ... initial> False >>> p.start() >>> print(p, p.is_alive()) <Process ... started> True >>> p.terminate() >>> time.sleep(0.1) >>> print(p, p.is_alive()) <Process ... stopped exitcode=-SIGTERM> False >>> p.exitcode == -signal.SIGTERM True
-
-
exception multiprocessing.ProcessError
-
La clase base de todos
multiprocessing
excepciones.
-
exception multiprocessing.BufferTooShort
-
Excepción planteada por
Connection.recv_bytes_into()
cuando el objeto de búfer proporcionado es demasiado pequeño para la lectura del mensaje.Si
e
es una instancia deBufferTooShort
luegoe.args[0]
dará el mensaje como una cadena de bytes.
-
exception multiprocessing.AuthenticationError
-
Se genera cuando hay un error de autenticación.
-
exception multiprocessing.TimeoutError
-
Generado por métodos con un tiempo de espera cuando expira el tiempo de espera.
Tuberías y colas
Cuando se usan múltiples procesos, generalmente se usa el paso de mensajes para la comunicación entre procesos y se evita tener que usar primitivas de sincronización como bloqueos.
Para pasar mensajes, uno puede usar Pipe()
(para una conexión entre dos procesos) o una cola (que permite múltiples productores y consumidores).
los Queue
, SimpleQueue
y JoinableQueue
los tipos son de múltiples productores, de múltiples consumidores FIFO colas modeladas en el queue.Queue
clase en la biblioteca estándar. Se diferencian en eso Queue
carece del task_done()
y join()
métodos introducidos en Python 2.5’s queue.Queue
clase.
Si utiliza JoinableQueue
entonces tú debe llama JoinableQueue.task_done()
para cada tarea eliminada de la cola o, de lo contrario, el semáforo utilizado para contar el número de tareas sin terminar puede eventualmente desbordarse, generando una excepción.
Tenga en cuenta que también se puede crear una cola compartida utilizando un objeto de administrador; consulte Gerentes.
Nota
multiprocessing
usa el habitual queue.Empty
y queue.Full
excepciones para señalar un tiempo de espera. No están disponibles en el multiprocessing
espacio de nombres, por lo que debe importarlos desde queue
.
Nota
Cuando un objeto se coloca en una cola, el objeto se decapa y un subproceso de fondo luego descarga los datos decapados a una tubería subyacente. Esto tiene algunas consecuencias que son un poco sorprendentes, pero no deberían causar ninguna dificultad práctica; si realmente te molestan, puedes usar una cola creada con un gerente.
- Después de poner un objeto en una cola vacía, puede haber un retraso infinitesimal antes de que la cola
empty()
devuelve el métodoFalse
yget_nowait()
puede volver sin levantarqueue.Empty
. - Si varios procesos están colocando objetos en cola, es posible que los objetos se reciban en el otro extremo fuera de servicio. Sin embargo, los objetos puestos en cola por el mismo proceso siempre estarán en el orden esperado entre sí.
Advertencia
Si un proceso se mata usando Process.terminate()
o os.kill()
mientras intenta utilizar un Queue
, es probable que los datos de la cola se corrompan. Esto puede hacer que cualquier otro proceso obtenga una excepción cuando intente usar la cola más adelante.
Advertencia
Como se mencionó anteriormente, si un proceso hijo ha puesto elementos en una cola (y no ha utilizado JoinableQueue.cancel_join_thread
), ese proceso no terminará hasta que todos los elementos almacenados en búfer se hayan descargado en la tubería.
Esto significa que si intenta unirse a ese proceso, puede llegar a un punto muerto a menos que esté seguro de que se han consumido todos los elementos que se colocaron en la cola. De manera similar, si el proceso hijo no es demoníaco, entonces el proceso padre puede colgarse al salir cuando intente unirse a todos sus hijos no demoníacos.
Tenga en cuenta que una cola creada con un administrador no tiene este problema. Ver Pautas de programación.
Para ver un ejemplo del uso de colas para la comunicación entre procesos, consulte Ejemplos de.
-
multiprocessing.Pipe([duplex])
-
Devuelve un par
(conn1, conn2)
deConnection
objetos que representan los extremos de una tubería.Si dúplex es
True
(el valor predeterminado) entonces la tubería es bidireccional. Si dúplex esFalse
entonces la tubería es unidireccional:conn1
solo se puede utilizar para recibir mensajes yconn2
solo se puede utilizar para enviar mensajes.
-
class multiprocessing.Queue([maxsize])
-
Devuelve una cola de proceso compartida implementada mediante una tubería y algunos bloqueos / semáforos. Cuando un proceso coloca por primera vez un elemento en la cola, se inicia un subproceso alimentador que transfiere objetos desde un búfer a la tubería.
Lo normal
queue.Empty
yqueue.Full
excepciones de la biblioteca estándarqueue
módulo se elevan para señalar tiempos de espera.Queue
implementa todos los métodos dequeue.Queue
excepto portask_done()
yjoin()
.-
qsize()
-
Devuelve el tamaño aproximado de la cola. Debido a la semántica de multiproceso / multiprocesamiento, este número no es confiable.
Tenga en cuenta que esto puede aumentar
NotImplementedError
en plataformas Unix como Mac OS X dondesem_getvalue()
no está implementado.
-
empty()
-
Regreso
True
si la cola está vacía,False
de lo contrario. Debido a la semántica de multiproceso / multiprocesamiento, esto no es confiable.
-
full()
-
Regreso
True
si la cola está llena,False
de lo contrario. Debido a la semántica de multiproceso / multiprocesamiento, esto no es confiable.
-
put(obj[, block[, timeout]])
-
Pon obj en la cola. Si el argumento opcional cuadra es
True
(el predeterminado) y se acabó el tiempo esNone
(predeterminado), bloquee si es necesario hasta que haya un espacio libre disponible. Si se acabó el tiempo es un número positivo, bloquea como máximo se acabó el tiempo segundos y levanta elqueue.Full
Excepción si no hay espacio libre disponible dentro de ese tiempo. De lo contrario (cuadra esFalse
), ponga un elemento en la cola si hay un espacio libre disponible de inmediato; de lo contrario, aumente elqueue.Full
excepciónse acabó el tiempo se ignora en ese caso).Modificado en la versión 3.8: Si la cola está cerrada,
ValueError
se levanta en lugar deAssertionError
.
-
put_nowait(obj)
-
Equivalente a
put(obj, False)
.
-
get([block[, timeout]])
-
Eliminar y devolver un artículo de la cola. Si argumentos opcionales cuadra es
True
(el predeterminado) y se acabó el tiempo esNone
(el valor predeterminado), bloquee si es necesario hasta que un elemento esté disponible. Si se acabó el tiempo es un número positivo, bloquea como máximo se acabó el tiempo segundos y levanta elqueue.Empty
excepción si no había ningún artículo disponible dentro de ese tiempo. De lo contrario (el bloque esFalse
), devuelva un artículo si hay uno disponible de inmediato; de lo contrario, aumente elqueue.Empty
excepciónse acabó el tiempo se ignora en ese caso).Modificado en la versión 3.8: Si la cola está cerrada,
ValueError
se levanta en lugar deOSError
.
-
get_nowait()
-
Equivalente a
get(False)
.
multiprocessing.Queue
tiene algunos métodos adicionales que no se encuentran enqueue.Queue
. Estos métodos suelen ser innecesarios para la mayoría de los códigos:-
close()
-
Indique que el proceso actual no pondrá más datos en esta cola. El subproceso en segundo plano se cerrará una vez que haya vaciado todos los datos almacenados en búfer en la tubería. Este es se llama automáticamente cuando la cola se recolecta como basura.
-
join_thread()
-
Únase al hilo de fondo. Esto solo se puede usar después
close()
ha sido llamado. Se bloquea hasta que sale el subproceso de fondo, lo que garantiza que todos los datos del búfer se hayan descargado en la tubería.De forma predeterminada, si un proceso no es el creador de la cola, al salir intentará unirse al hilo de fondo de la cola. El proceso puede llamar
cancel_join_thread()
para hacerjoin_thread()
hacer nada.
-
cancel_join_thread()
-
Evitar
join_thread()
del bloqueo. En particular, esto evita que el subproceso en segundo plano se una automáticamente cuando se cierra el proceso; consultejoin_thread()
.Un mejor nombre para este método podría ser
allow_exit_without_flush()
. Es probable que provoque la pérdida de datos en cola y es casi seguro que no necesitará utilizarlos. En realidad, solo está allí si necesita que el proceso actual salga de inmediato sin esperar a descargar los datos en cola a la tubería subyacente, y no le importa la pérdida de datos.
Nota
La funcionalidad de esta clase requiere una implementación de semáforo compartida que funcione en el sistema operativo host. Sin uno, la funcionalidad de esta clase se desactivará e intentará crear una instancia
Queue
resultará en unImportError
. Ver bpo-3770 para informacion adicional. Lo mismo es válido para cualquiera de los tipos de cola especializados que se enumeran a continuación. -
-
class multiprocessing.SimpleQueue
-
Es un simplificado
Queue
tipo, muy cerca de un bloqueadoPipe
.-
close()
-
Cerrar la cola: liberar recursos internos.
Una cola no se debe utilizar más después de que se cierre. Por ejemplo,
get()
,put()
yempty()
ya no se deben llamar a los métodos.Nuevo en la versión 3.9.
-
empty()
-
Regreso
True
si la cola está vacía,False
de lo contrario.
-
get()
-
Eliminar y devolver un artículo de la cola.
-
put(item)
-
Poner artículo en la cola.
-
-
class multiprocessing.JoinableQueue([maxsize])
-
JoinableQueue
, aQueue
subclase, es una cola que además tienetask_done()
yjoin()
métodos.-
task_done()
-
Indica que una tarea en cola anteriormente está completa. Usado por consumidores de cola. Para cada
get()
utilizado para buscar una tarea, una llamada posterior atask_done()
le dice a la cola que el procesamiento de la tarea está completo.Si un
join()
está bloqueando actualmente, se reanudará cuando todos los elementos hayan sido procesados (lo que significa quetask_done()
se recibió una llamada por cada artículo que se habíaput()
en la cola).Plantea un
ValueError
si se llama más veces que los elementos colocados en la cola.
-
join()
-
Bloquear hasta que se hayan obtenido y procesado todos los elementos de la cola.
El recuento de tareas sin terminar aumenta cada vez que se agrega un elemento a la cola. El recuento desciende cada vez que un consumidor llama
task_done()
para indicar que se recuperó el elemento y que todo el trabajo en él está completo. Cuando el recuento de tareas sin terminar cae a cero,join()
desbloquea.
-
Diverso
-
multiprocessing.active_children()
-
Devuelve la lista de todos los hijos vivos del proceso actual.
Llamar a esto tiene el efecto secundario de “unirse” a cualquier proceso que ya haya finalizado.
-
multiprocessing.cpu_count()
-
Devuelve el número de CPU del sistema.
Este número no es equivalente al número de CPU que puede utilizar el proceso actual. El número de CPU utilizables se puede obtener con
len(os.sched_getaffinity(0))
Puede levantar
NotImplementedError
.Ver también
os.cpu_count()
-
multiprocessing.current_process()
-
Devuelve el
Process
objeto correspondiente al proceso actual.Un análogo de
threading.current_thread()
.
-
multiprocessing.parent_process()
-
Devuelve el
Process
objeto correspondiente al proceso padre delcurrent_process()
. Para el proceso principal,parent_process
estaránNone
.Nuevo en la versión 3.8.
-
multiprocessing.freeze_support()
-
Agregue soporte para cuando un programa que usa
multiprocessing
se ha congelado para producir un ejecutable de Windows. (Ha sido probado con py2exe, PyInstaller y cx_Freeze.)Es necesario llamar a esta función inmediatamente después de la
if __name__ ==
línea del módulo principal. Por ejemplo:
'__main__'from multiprocessing import Process, freeze_support def f(): print('hello world!') if __name__ == '__main__': freeze_support() Process(target=f).start()
Si el
freeze_support()
se omite la línea, luego, al intentar ejecutar el ejecutable congelado, se generaráRuntimeError
.Vocación
freeze_support()
no tiene ningún efecto cuando se invoca en cualquier sistema operativo que no sea Windows. Además, si el intérprete de Python ejecuta normalmente el módulo en Windows (el programa no se ha congelado), entoncesfreeze_support()
no tiene efecto.
-
multiprocessing.get_all_start_methods()
-
Devuelve una lista de los métodos de inicio admitidos, el primero de los cuales es el predeterminado. Los posibles métodos de inicio son
'fork'
,'spawn'
y'forkserver'
. Solo en Windows'spawn'
está disponible. En Unix'fork'
y'spawn'
siempre son compatibles, con'fork'
siendo el predeterminado.Nuevo en la versión 3.4.
-
multiprocessing.get_context(method=None)
-
Devuelve un objeto de contexto que tiene los mismos atributos que el
multiprocessing
módulo.Si método es
None
luego se devuelve el contexto predeterminado. De lo contrario método debiera ser'fork'
,'spawn'
,'forkserver'
.ValueError
se genera si el método de inicio especificado no está disponible.Nuevo en la versión 3.4.
-
multiprocessing.get_start_method(allow_none=False)
-
Devuelve el nombre del método de inicio utilizado para iniciar los procesos.
Si el método de inicio no se ha corregido y allow_none es falso, entonces el método de inicio se fija al predeterminado y se devuelve el nombre. Si el método de inicio no se ha corregido y allow_none es verdad entonces
None
es regresado.El valor de retorno puede ser
'fork'
,'spawn'
,'forkserver'
oNone
.'fork'
es el predeterminado en Unix, mientras que'spawn'
es el predeterminado en Windows.Nuevo en la versión 3.4.
-
multiprocessing.set_executable()
-
Establece la ruta del intérprete de Python que se utilizará al iniciar un proceso hijo. (Por defecto
sys.executable
se utiliza). Los incrustadores probablemente necesitarán hacer algo comoset_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
antes de que puedan crear procesos secundarios.
Modificado en la versión 3.4: Ahora compatible con Unix cuando el
'spawn'
se utiliza el método de inicio.
-
multiprocessing.set_start_method(method)
-
Establezca el método que se debe utilizar para iniciar procesos secundarios. método puede ser
'fork'
,'spawn'
o'forkserver'
.Tenga en cuenta que esto debe llamarse como máximo una vez y debe protegerse dentro del
if __name__ == '__main__'
cláusula del módulo principal.Nuevo en la versión 3.4.
Nota
multiprocessing
no contiene análogos de threading.active_count()
, threading.enumerate()
, threading.settrace()
, threading.setprofile()
, threading.Timer
, o threading.local
.
Objetos de conexión
Los objetos de conexión permiten el envío y la recepción de cadenas o objetos decapables. Se pueden considerar como enchufes conectados orientados a mensajes.
Los objetos de conexión se crean generalmente usando Pipe
– ver también Oyentes y Clientes.
-
class multiprocessing.connection.Connection
-
-
send(obj)
-
Envíe un objeto al otro extremo de la conexión que debe leerse usando
recv()
.El objeto debe ser decapado. Los encurtidos muy grandes (aproximadamente 32 MiB +, aunque depende del sistema operativo) pueden generar un
ValueError
excepción.
-
recv()
-
Devuelve un objeto enviado desde el otro extremo de la conexión usando
send()
. Bloquea hasta que haya algo que recibir. ElevaEOFError
si no queda nada para recibir y el otro extremo estaba cerrado.
-
fileno()
-
Devuelve el descriptor de archivo o el identificador utilizado por la conexión.
-
close()
-
Cierra la conexión.
Esto se llama automáticamente cuando la conexión se recolecta como basura.
-
poll([timeout])
-
Devuelve si hay datos disponibles para leer.
Si se acabó el tiempo no se especifica, volverá inmediatamente. Si se acabó el tiempo es un número, entonces esto especifica el tiempo máximo en segundos para bloquear. Si se acabó el tiempo es
None
entonces se usa un tiempo de espera infinito.Tenga en cuenta que se pueden sondear varios objetos de conexión a la vez mediante el uso de
multiprocessing.connection.wait()
.
-
send_bytes(buffer[, offset[, size]])
-
Enviar datos de bytes desde un objeto similar a bytes como un mensaje completo.
Si compensar se da, entonces los datos se leen desde esa posición en buffer. Si Talla Se da entonces que se leerán muchos bytes del búfer. Los búferes muy grandes (aproximadamente 32 MiB +, aunque depende del sistema operativo) pueden generar un
ValueError
excepción
-
recv_bytes([maxlength])
-
Devuelve un mensaje completo de datos de bytes enviados desde el otro extremo de la conexión como una cadena. Bloquea hasta que haya algo que recibir. Eleva
EOFError
si no queda nada para recibir y el otro extremo se ha cerrado.Si longitud máxima se especifica y el mensaje es más largo que longitud máxima luego
OSError
se activa y la conexión ya no será legible.Modificado en la versión 3.3: Esta función solía aumentar
IOError
, que ahora es un alias deOSError
.
-
recv_bytes_into(buffer[, offset])
-
Interpretar buffer un mensaje completo de datos de bytes enviados desde el otro extremo de la conexión y devuelve el número de bytes en el mensaje. Bloquea hasta que haya algo que recibir. Eleva
EOFError
si no queda nada para recibir y el otro extremo estaba cerrado.buffer debe ser un escribible objeto similar a bytes. Si compensar se da, entonces el mensaje se escribirá en el búfer desde esa posición. El desplazamiento debe ser un número entero no negativo menor que la longitud de buffer (en bytes).
Si el búfer es demasiado corto,
BufferTooShort
se genera una excepción y el mensaje completo está disponible comoe.args[0]
dóndee
es la instancia de excepción.
Modificado en la versión 3.3: Los propios objetos de conexión ahora se pueden transferir entre procesos utilizando
Connection.send()
yConnection.recv()
.Nuevo en la versión 3.3: Los objetos de conexión ahora admiten el protocolo de gestión de contexto; consulte Tipos de administrador de contexto.
__enter__()
devuelve el objeto de conexión y__exit__()
llamadasclose()
. -
Por ejemplo:
>>> from multiprocessing import Pipe >>> a, b = Pipe() >>> a.send([1, 'hello', None]) >>> b.recv() [1, 'hello', None] >>> b.send_bytes(b'thank you') >>> a.recv_bytes() b'thank you' >>> import array >>> arr1 = array.array('i', range(5)) >>> arr2 = array.array('i', [0] * 10) >>> a.send_bytes(arr1) >>> count = b.recv_bytes_into(arr2) >>> assert count == len(arr1) * arr1.itemsize >>> arr2 array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
Advertencia
los Connection.recv()
El método elimina automáticamente los datos que recibe, lo que puede suponer un riesgo para la seguridad a menos que pueda confiar en el proceso que envió el mensaje.
Por lo tanto, a menos que el objeto de conexión se haya producido utilizando Pipe()
solo debes usar el recv()
y send()
métodos después de realizar algún tipo de autenticación. Ver Claves de autenticación.
Advertencia
Si un proceso se mata mientras intenta leer o escribir en una tubería, es probable que los datos de la tubería se corrompan, porque puede resultar imposible estar seguro de dónde se encuentran los límites del mensaje.
Primitivas de sincronización
Generalmente, las primitivas de sincronización no son tan necesarias en un programa multiproceso como lo son en un programa multiproceso. Consulte la documentación para threading
módulo.
Tenga en cuenta que también se pueden crear primitivas de sincronización utilizando un objeto administrador; consulte Gerentes.
-
class multiprocessing.Barrier(parties[, action[, timeout]])
-
Un objeto barrera: un clon de
threading.Barrier
.Nuevo en la versión 3.3.
-
class multiprocessing.BoundedSemaphore([value])
-
Un objeto semáforo acotado: un análogo cercano de
threading.BoundedSemaphore
.Existe una diferencia solitaria de su análogo cercano: su
acquire
el primer argumento del método se llama cuadra, como es consistente conLock.acquire()
.Nota
En Mac OS X, esto es indistinguible de
Semaphore
porquesem_getvalue()
no está implementado en esa plataforma.
-
class multiprocessing.Condition([lock])
-
Una variable de condición: un alias para
threading.Condition
.Si cerrar con llave se especifica, entonces debería ser un
Lock
oRLock
objeto demultiprocessing
.Modificado en la versión 3.3: los
wait_for()
se agregó el método.
-
class multiprocessing.Event
-
Un clon de
threading.Event
.
-
class multiprocessing.Lock
-
Un objeto de bloqueo no recursivo: un análogo cercano de
threading.Lock
. Una vez que un proceso o subproceso ha adquirido un bloqueo, los intentos posteriores de adquirirlo de cualquier proceso o subproceso se bloquearán hasta que se libere; cualquier proceso o hilo puede liberarlo. Los conceptos y comportamientos dethreading.Lock
como se aplica a los subprocesos se replican aquí enmultiprocessing.Lock
ya que se aplica a procesos o subprocesos, excepto cuando se indique.Tenga en cuenta que
Lock
es en realidad una función de fábrica que devuelve una instancia demultiprocessing.synchronize.Lock
inicializado con un contexto predeterminado.Lock
apoya el administrador de contexto protocolo y, por lo tanto, se puede utilizar enwith
declaraciones.-
acquire(block=True, timeout=None)
-
Adquirir un candado, bloqueante o no bloqueante.
Con el cuadra argumento establecido en
True
(el valor predeterminado), la llamada al método se bloqueará hasta que el bloqueo esté en un estado desbloqueado, luego configúrelo como bloqueado y regreseTrue
. Tenga en cuenta que el nombre de este primer argumento difiere del dethreading.Lock.acquire()
.Con el cuadra argumento establecido en
False
, la llamada al método no se bloquea. Si la cerradura se encuentra actualmente en un estado bloqueado, regreseFalse
; de lo contrario, establezca el bloqueo en un estado bloqueado y vuelvaTrue
.Cuando se invoca con un valor de punto flotante positivo para se acabó el tiempo, bloquea como máximo el número de segundos especificado por se acabó el tiempo siempre que no se pueda adquirir el candado. Invocaciones con un valor negativo para se acabó el tiempo son equivalentes a un se acabó el tiempo de cero. Invocaciones con un se acabó el tiempo valor de
None
(el valor predeterminado) establece el período de tiempo de espera en infinito. Tenga en cuenta que el tratamiento de negativos oNone
valores para se acabó el tiempo difiere del comportamiento implementado enthreading.Lock.acquire()
. los se acabó el tiempo argumento no tiene implicaciones prácticas si el cuadra el argumento se establece enFalse
y por lo tanto se ignora. DevolucionesTrue
si la cerradura ha sido adquirida oFalse
si ha transcurrido el tiempo de espera.
-
release()
-
Suelta un candado. Esto se puede llamar desde cualquier proceso o subproceso, no solo el proceso o subproceso que originalmente adquirió el bloqueo.
El comportamiento es el mismo que en
threading.Lock.release()
excepto que cuando se invoca en un candado desbloqueado, unValueError
es elevado.
-
-
class multiprocessing.RLock
-
Un objeto de bloqueo recursivo: un análogo cercano de
threading.RLock
. Un bloqueo recursivo debe ser liberado por el proceso o subproceso que lo adquirió. Una vez que un proceso o subproceso ha adquirido un bloqueo recursivo, el mismo proceso o subproceso puede adquirirlo nuevamente sin bloquearse; ese proceso o subproceso debe liberarlo una vez por cada vez que se haya adquirido.Tenga en cuenta que
RLock
es en realidad una función de fábrica que devuelve una instancia demultiprocessing.synchronize.RLock
inicializado con un contexto predeterminado.RLock
apoya el administrador de contexto protocolo y, por lo tanto, se puede utilizar enwith
declaraciones.-
acquire(block=True, timeout=None)
-
Adquirir un candado, bloqueante o no bloqueante.
Cuando se invoca con el cuadra argumento establecido en
True
, bloquear hasta que el bloqueo esté en un estado desbloqueado (que no sea propiedad de ningún proceso o subproceso) a menos que el bloqueo ya sea propiedad del proceso o subproceso actual. El proceso o subproceso actual toma posesión del bloqueo (si aún no lo tiene) y el nivel de recursividad dentro del bloqueo se incrementa en uno, lo que da como resultado un valor de retorno deTrue
. Tenga en cuenta que hay varias diferencias en el comportamiento de este primer argumento en comparación con la implementación dethreading.RLock.acquire()
, comenzando con el nombre del argumento en sí.Cuando se invoca con el cuadra argumento establecido en
False
, no bloquees. Si el bloqueo ya ha sido adquirido (y por lo tanto es propiedad) de otro proceso o subproceso, el proceso o subproceso actual no toma posesión y el nivel de recursividad dentro del bloqueo no cambia, lo que da como resultado un valor de retorno deFalse
. Si el bloqueo está en un estado desbloqueado, el proceso o subproceso actual toma posesión y el nivel de recursividad se incrementa, lo que da como resultado un valor de retorno deTrue
.Uso y comportamientos del se acabó el tiempo argumento son los mismos que en
Lock.acquire()
. Tenga en cuenta que algunos de estos comportamientos de se acabó el tiempo difieren de los comportamientos implementados enthreading.RLock.acquire()
.
-
release()
-
Suelte un bloqueo, disminuyendo el nivel de recursividad. Si después del decremento el nivel de recursividad es cero, restablezca el bloqueo a desbloqueado (que no sea propiedad de ningún proceso o subproceso) y si cualquier otro proceso o subproceso está bloqueado esperando que el bloqueo se desbloquee, permita que continúe exactamente uno de ellos. Si después del decremento el nivel de recursividad sigue siendo distinto de cero, el bloqueo permanece bloqueado y es propiedad del proceso o subproceso que realiza la llamada.
Solo llame a este método cuando el proceso o subproceso que realiza la llamada sea propietario del bloqueo. Un
AssertionError
se genera si este método es llamado por un proceso o subproceso que no sea el propietario o si el bloqueo está en un estado desbloqueado (sin propietario). Tenga en cuenta que el tipo de excepción planteada en esta situación difiere del comportamiento implementado enthreading.RLock.release()
.
-
-
class multiprocessing.Semaphore([value])
-
Un objeto semáforo: un análogo cercano de
threading.Semaphore
.Existe una diferencia solitaria de su análogo cercano: su
acquire
el primer argumento del método se llama cuadra, como es consistente conLock.acquire()
.
Nota
En Mac OS X, sem_timedwait
no es compatible, así que llamando acquire()
con un tiempo de espera emulará el comportamiento de esa función utilizando un ciclo de suspensión.
Nota
Si la señal SIGINT generada por Ctrl-C llega mientras el hilo principal está bloqueado por una llamada a BoundedSemaphore.acquire()
, Lock.acquire()
, RLock.acquire()
, Semaphore.acquire()
, Condition.acquire()
o Condition.wait()
entonces la llamada se interrumpirá inmediatamente y KeyboardInterrupt
se levantará.
Esto difiere del comportamiento de threading
donde SIGINT se ignorará mientras las llamadas de bloqueo equivalentes estén en curso.
Nota
Algunas de las funciones de este paquete requieren una implementación de semáforo compartida que funcione en el sistema operativo host. Sin uno, el multiprocessing.synchronize
El módulo se deshabilitará y los intentos de importarlo darán como resultado un ImportError
. Ver bpo-3770 para informacion adicional.
Compartido ctypes
Objetos
Es posible crear objetos compartidos utilizando memoria compartida que puede ser heredada por procesos secundarios.
-
multiprocessing.Value(typecode_or_type, *args, lock=True)
-
Devolver un
ctypes
objeto asignado desde la memoria compartida. De forma predeterminada, el valor de retorno es en realidad un contenedor sincronizado para el objeto. Se puede acceder al objeto en sí a través del valor atributo de unValue
.typecode_or_type determina el tipo del objeto devuelto: es un tipo ctypes o un código de tipo de un carácter del tipo utilizado por el
array
módulo. * argumentos se pasa al constructor para el tipo.Si cerrar con llave es
True
(valor predeterminado), se crea un nuevo objeto de bloqueo recursivo para sincronizar el acceso al valor. Si cerrar con llave es unLock
oRLock
objeto, que se utilizará para sincronizar el acceso al valor. Si cerrar con llave esFalse
entonces el acceso al objeto devuelto no estará protegido automáticamente por un candado, por lo que no será necesariamente “seguro para el proceso”.Operaciones como
+=
que implican una lectura y una escritura no son atómicas. Entonces, si, por ejemplo, desea incrementar atómicamente un valor compartido, no es suficiente con hacerlocounter.value += 1
Suponiendo que el bloqueo asociado es recursivo (que es de forma predeterminada), puede hacerlo
with counter.get_lock(): counter.value += 1
Tenga en cuenta que cerrar con llave es un argumento de solo palabras clave.
-
multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)
-
Devuelve una matriz ctypes asignada desde la memoria compartida. De forma predeterminada, el valor de retorno es en realidad un contenedor sincronizado para la matriz.
typecode_or_type determina el tipo de los elementos de la matriz devuelta: es un tipo ctypes o un código de tipo de un carácter del tipo utilizado por el
array
módulo. Si size_or_initializer es un número entero, luego determina la longitud de la matriz, y la matriz se pondrá inicialmente a cero. De lo contrario, size_or_initializer es una secuencia que se utiliza para inicializar la matriz y cuya longitud determina la longitud de la matriz.Si cerrar con llave es
True
(predeterminado), se crea un nuevo objeto de bloqueo para sincronizar el acceso al valor. Si cerrar con llave es unLock
oRLock
objeto, que se utilizará para sincronizar el acceso al valor. Si cerrar con llave esFalse
entonces el acceso al objeto devuelto no estará protegido automáticamente por un candado, por lo que no será necesariamente “seguro para el proceso”.Tenga en cuenta que cerrar con llave es un argumento de solo palabra clave.
Tenga en cuenta que una matriz de
ctypes.c_char
tiene valor y crudo atributos que permiten usarlo para almacenar y recuperar cadenas.
los multiprocessing.sharedctypes
módulo
los multiprocessing.sharedctypes
El módulo proporciona funciones para asignar ctypes
objetos de la memoria compartida que pueden ser heredados por procesos secundarios.
Nota
Aunque es posible almacenar un puntero en la memoria compartida, recuerde que esto hará referencia a una ubicación en el espacio de direcciones de un proceso específico. Sin embargo, es muy probable que el puntero no sea válido en el contexto de un segundo proceso y tratar de eliminar la referencia del puntero del segundo proceso puede provocar un bloqueo.
-
multiprocessing.sharedctypes.RawArray(typecode_or_type, size_or_initializer)
-
Devuelve una matriz ctypes asignada desde la memoria compartida.
typecode_or_type determina el tipo de los elementos de la matriz devuelta: es un tipo ctypes o un código de tipo de un carácter del tipo utilizado por el
array
módulo. Si size_or_initializer es un número entero, entonces determina la longitud de la matriz, y la matriz se pondrá inicialmente a cero. De lo contrario size_or_initializer es una secuencia que se utiliza para inicializar la matriz y cuya longitud determina la longitud de la matriz.Tenga en cuenta que configurar y obtener un elemento es potencialmente no atómico: use
Array()
en su lugar, para asegurarse de que el acceso se sincronice automáticamente mediante un candado.
-
multiprocessing.sharedctypes.RawValue(typecode_or_type, *args)
-
Devuelve un objeto ctypes asignado desde la memoria compartida.
typecode_or_type determina el tipo del objeto devuelto: es un tipo ctypes o un código de tipo de un carácter del tipo utilizado por el
array
módulo. * argumentos se pasa al constructor para el tipo.Tenga en cuenta que configurar y obtener el valor es potencialmente no atómico: use
Value()
en su lugar, para asegurarse de que el acceso se sincronice automáticamente mediante un candado.Tenga en cuenta que una matriz de
ctypes.c_char
tienevalue
yraw
atributos que permiten usarlo para almacenar y recuperar cadenas; consulte la documentación paractypes
.
-
multiprocessing.sharedctypes.Array(typecode_or_type, size_or_initializer, *, lock=True)
-
Lo mismo que
RawArray()
excepto que dependiendo del valor de cerrar con llave se puede devolver un contenedor de sincronización seguro para el proceso en lugar de una matriz ctypes sin formato.Si cerrar con llave es
True
(predeterminado), se crea un nuevo objeto de bloqueo para sincronizar el acceso al valor. Si cerrar con llave es unLock
oRLock
objeto, que se utilizará para sincronizar el acceso al valor. Si cerrar con llave esFalse
entonces el acceso al objeto devuelto no estará protegido automáticamente por un candado, por lo que no será necesariamente “seguro para el proceso”.Tenga en cuenta que cerrar con llave es un argumento de solo palabras clave.
-
multiprocessing.sharedctypes.Value(typecode_or_type, *args, lock=True)
-
Lo mismo que
RawValue()
excepto que dependiendo del valor de cerrar con llave se puede devolver un contenedor de sincronización seguro para el proceso en lugar de un objeto ctypes sin formato.Si cerrar con llave es
True
(predeterminado), se crea un nuevo objeto de bloqueo para sincronizar el acceso al valor. Si cerrar con llave es unLock
oRLock
objeto, que se utilizará para sincronizar el acceso al valor. Si cerrar con llave esFalse
entonces el acceso al objeto devuelto no estará protegido automáticamente por un candado, por lo que no será necesariamente “seguro para el proceso”.Tenga en cuenta que cerrar con llave es un argumento de solo palabras clave.
-
multiprocessing.sharedctypes.copy(obj)
-
Devuelve un objeto ctypes asignado desde la memoria compartida que es una copia del objeto ctypes obj.
-
multiprocessing.sharedctypes.synchronized(obj[, lock])
-
Devuelve un objeto contenedor seguro para el proceso para un objeto ctypes que usa cerrar con llave para sincronizar el acceso. Si cerrar con llave es
None
(el predeterminado) luego unmultiprocessing.RLock
El objeto se crea automáticamente.Un contenedor sincronizado tendrá dos métodos además de los del objeto que envuelve:
get_obj()
devuelve el objeto envuelto yget_lock()
devuelve el objeto de bloqueo utilizado para la sincronización.Tenga en cuenta que acceder al objeto ctypes a través del contenedor puede ser mucho más lento que acceder al objeto ctypes sin formato.
Modificado en la versión 3.5: Los objetos sincronizados admiten la administrador de contexto protocolo.
La siguiente tabla compara la sintaxis para crear objetos ctypes compartidos desde la memoria compartida con la sintaxis normal de ctypes. (En la mesa MyStruct
es una subclase de ctypes.Structure
.)
ctypes |
sharedctypes usando el tipo |
sharedctypes usando typecode |
---|---|---|
c_double (2,4) |
RawValue (c_double, 2.4) |
RawValue (‘d’, 2.4) |
MyStruct (4, 6) |
RawValue (MyStruct, 4, 6) |
|
(c_corto * 7) () |
RawArray (c_short, 7) |
RawArray (‘h’, 7) |
(c_int * 3) (9, 2, 8) |
RawArray (c_int, (9, 2, 8)) |
RawArray (‘i’, (9, 2, 8)) |
A continuación se muestra un ejemplo en el que un proceso hijo modifica varios objetos ctypes:
from multiprocessing import Process, Lock from multiprocessing.sharedctypes import Value, Array from ctypes import Structure, c_double class Point(Structure): _fields_ = [('x', c_double), ('y', c_double)] def modify(n, x, s, A): n.value **= 2 x.value **= 2 s.value = s.value.upper() for a in A: a.x **= 2 a.y **= 2 if __name__ == '__main__': lock = Lock() n = Value('i', 7) x = Value(c_double, 1.0/3.0, lock=False) s = Array('c', b'hello world', lock=lock) A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock) p = Process(target=modify, args=(n, x, s, A)) p.start() p.join() print(n.value) print(x.value) print(s.value) print([(a.x, a.y) for a in A])
Los resultados impresos son
49 0.1111111111111111 HELLO WORLD [(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]
Gerentes
Los administradores proporcionan una forma de crear datos que se pueden compartir entre diferentes procesos, incluido el intercambio a través de una red entre procesos que se ejecutan en diferentes máquinas. Un objeto administrador controla un proceso de servidor que administra objetos compartidos. Otros procesos pueden acceder a los objetos compartidos mediante el uso de proxies.
-
multiprocessing.Manager()
-
Devuelve un iniciado
SyncManager
objeto que se puede utilizar para compartir objetos entre procesos. El objeto de administrador devuelto corresponde a un proceso hijo generado y tiene métodos que crearán objetos compartidos y devolverán los proxies correspondientes.
Los procesos de administrador se cerrarán tan pronto como se recojan la basura o se cierre el proceso principal. Las clases de administradores se definen en el multiprocessing.managers
módulo:
-
class multiprocessing.managers.BaseManager([address[, authkey]])
-
Cree un objeto BaseManager.
Una vez creado uno debe llamar
start()
oget_server().serve_forever()
para asegurarse de que el objeto de administrador se refiere a un proceso de administrador iniciado.Dirección es la dirección en la que el proceso de administrador escucha nuevas conexiones. Si Dirección es
None
luego se elige uno arbitrario.Clave de autenticación es la clave de autenticación que se utilizará para verificar la validez de las conexiones entrantes al proceso del servidor. Si Clave de autenticación es
None
luegocurrent_process().authkey
se utiliza. De lo contrario Clave de autenticación se utiliza y debe ser una cadena de bytes.-
start([initializer[, initargs]])
-
Inicie un subproceso para iniciar el administrador. Si inicializador no es
None
entonces el subproceso llamaráinitializer(*initargs)
cuando comienza.
-
get_server()
-
Devuelve un
Server
objeto que representa el servidor real bajo el control del Administrador. losServer
objeto soporta elserve_forever()
método:>>> from multiprocessing.managers import BaseManager >>> manager = BaseManager(address=('', 50000), authkey=b'abc') >>> server = manager.get_server() >>> server.serve_forever()
Server
además tiene unaddress
atributo.
-
connect()
-
Conecte un objeto de administrador local a un proceso de administrador remoto:
>>> from multiprocessing.managers import BaseManager >>> m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc') >>> m.connect()
-
shutdown()
-
Detenga el proceso utilizado por el gerente. Esto solo está disponible si
start()
se ha utilizado para iniciar el proceso del servidor.Esto se puede llamar varias veces.
-
register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])
-
Un método de clase que se puede utilizar para registrar un tipo o llamar con la clase de administrador.
typeid es un “identificador de tipo” que se utiliza para identificar un tipo particular de objeto compartido. Debe ser una cadena.
invocable es un invocable que se utiliza para crear objetos para este tipo de identificador. Si una instancia de administrador se conectará al servidor mediante el
connect()
método, o si el create_method el argumento esFalse
entonces esto se puede dejar comoNone
.tipo de proxy es una subclase de
BaseProxy
que se utiliza para crear proxies para objetos compartidos con este typeid. SiNone
luego, se crea automáticamente una clase de proxy.expuesto se utiliza para especificar una secuencia de nombres de métodos a los que se debe permitir el acceso de los proxies para este typeid utilizando
BaseProxy._callmethod()
. (Si expuesto esNone
luegoproxytype._exposed_
se utiliza en su lugar si existe.) En el caso de que no se especifique una lista expuesta, todos los “métodos públicos” del objeto compartido serán accesibles. (Aquí, un “método público” significa cualquier atributo que tiene un__call__()
método y cuyo nombre no comienza con'_'
.)method_to_typeid es un mapeo utilizado para especificar el tipo de retorno de los métodos expuestos que deberían devolver un proxy. Asigna nombres de métodos a cadenas de typeid. (Si method_to_typeid es
None
luegoproxytype._method_to_typeid_
se utiliza en su lugar si existe.) Si el nombre de un método no es una clave de este mapeo o si el mapeo esNone
luego, el objeto devuelto por el método se copiará por valor.create_method determina si se debe crear un método con nombre typeid que se puede usar para decirle al proceso del servidor que cree un nuevo objeto compartido y le devuelva un proxy. Por defecto es
True
.
BaseManager
Las instancias también tienen una propiedad de solo lectura:-
address
-
La dirección utilizada por el gerente.
Modificado en la versión 3.3: Los objetos de administrador admiten el protocolo de gestión de contexto; consulte Tipos de administrador de contexto.
__enter__()
inicia el proceso del servidor (si aún no se ha iniciado) y luego devuelve el objeto administrador.__exit__()
llamadasshutdown()
.En versiones anteriores
__enter__()
no inició el proceso del servidor del administrador si aún no se había iniciado. -
-
class multiprocessing.managers.SyncManager
-
Una subclase de
BaseManager
que se puede utilizar para la sincronización de procesos. Los objetos de este tipo son devueltos pormultiprocessing.Manager()
.Sus métodos crean y regresan Objetos proxy para que varios tipos de datos de uso común se sincronicen entre procesos. Esto incluye, en particular, listas y diccionarios compartidos.
-
Barrier(parties[, action[, timeout]])
-
Crea un compartido
threading.Barrier
objeto y devuelve un proxy para él.Nuevo en la versión 3.3.
-
BoundedSemaphore([value])
-
Crea un compartido
threading.BoundedSemaphore
objeto y devuelve un proxy para él.
-
Condition([lock])
-
Crea un compartido
threading.Condition
objeto y devuelve un proxy para él.Si cerrar con llave se proporciona, entonces debería ser un proxy para un
threading.Lock
othreading.RLock
objeto.Modificado en la versión 3.3: los
wait_for()
se agregó el método.
-
Event()
-
Crea un compartido
threading.Event
objeto y devuelve un proxy para él.
-
Lock()
-
Crea un compartido
threading.Lock
objeto y devuelve un proxy para él.
-
Namespace()
-
Crea un compartido
Namespace
objeto y devuelve un proxy para él.
-
Queue([maxsize])
-
Crea un compartido
queue.Queue
objeto y devuelve un proxy para él.
-
RLock()
-
Crea un compartido
threading.RLock
objeto y devuelve un proxy para él.
-
Semaphore([value])
-
Crea un compartido
threading.Semaphore
objeto y devuelve un proxy para él.
-
Array(typecode, sequence)
-
Cree una matriz y devuelva un proxy para ella.
-
Value(typecode, value)
-
Crea un objeto con escritura
value
atributo y devolver un proxy para él.
-
dict()
-
dict(mapping)
-
dict(sequence)
-
Crea un compartido
dict
objeto y devuelve un proxy para él.
-
list()
-
list(sequence)
-
Crea un compartido
list
objeto y devuelve un proxy para él.
Modificado en la versión 3.6: Los objetos compartidos se pueden anidar. Por ejemplo, un objeto contenedor compartido, como una lista compartida, puede contener otros objetos compartidos que serán administrados y sincronizados por el
SyncManager
. -
-
class multiprocessing.managers.Namespace
-
Un tipo que puede registrarse con
SyncManager
.Un objeto de espacio de nombres no tiene métodos públicos, pero tiene atributos de escritura. Su representación muestra los valores de sus atributos.
Sin embargo, cuando se usa un proxy para un objeto de espacio de nombres, un atributo que comienza con
'_'
será un atributo del proxy y no un atributo del referente:>>> manager = multiprocessing.Manager() >>> Global = manager.Namespace() >>> Global.x = 10 >>> Global.y = 'hello' >>> Global._z = 12.3 # this is an attribute of the proxy >>> print(Global) Namespace(x=10, y='hello')
Gerentes personalizados
Para crear su propio gerente, se crea una subclase de BaseManager
y usa el register()
classmethod para registrar nuevos tipos o invocables con la clase del administrador. Por ejemplo:
from multiprocessing.managers import BaseManager class MathsClass: def add(self, x, y): return x + y def mul(self, x, y): return x * y class MyManager(BaseManager): pass MyManager.register('Maths', MathsClass) if __name__ == '__main__': with MyManager() as manager: maths = manager.Maths() print(maths.add(4, 3)) # prints 7 print(maths.mul(7, 8)) # prints 56
Usando un administrador remoto
Es posible ejecutar un servidor administrador en una máquina y hacer que los clientes lo usen desde otras máquinas (asumiendo que los firewalls involucrados lo permitan).
La ejecución de los siguientes comandos crea un servidor para una única cola compartida a la que pueden acceder los clientes remotos:
>>> from multiprocessing.managers import BaseManager >>> from queue import Queue >>> queue = Queue() >>> class QueueManager(BaseManager): pass >>> QueueManager.register('get_queue', callable=lambda:queue) >>> m = QueueManager(address=('', 50000), authkey=b'abracadabra') >>> s = m.get_server() >>> s.serve_forever()
Un cliente puede acceder al servidor de la siguiente manera:
>>> from multiprocessing.managers import BaseManager >>> class QueueManager(BaseManager): pass >>> QueueManager.register('get_queue') >>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra') >>> m.connect() >>> queue = m.get_queue() >>> queue.put('hello')
Otro cliente también puede usarlo:
>>> from multiprocessing.managers import BaseManager >>> class QueueManager(BaseManager): pass >>> QueueManager.register('get_queue') >>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra') >>> m.connect() >>> queue = m.get_queue() >>> queue.get() 'hello'
Los procesos locales también pueden acceder a esa cola, utilizando el código de arriba en el cliente para acceder a ella de forma remota:
>>> from multiprocessing import Process, Queue >>> from multiprocessing.managers import BaseManager >>> class Worker(Process): ... def __init__(self, q): ... self.q = q ... super().__init__() ... def run(self): ... self.q.put('local hello') ... >>> queue = Queue() >>> w = Worker(queue) >>> w.start() >>> class QueueManager(BaseManager): pass ... >>> QueueManager.register('get_queue', callable=lambda: queue) >>> m = QueueManager(address=('', 50000), authkey=b'abracadabra') >>> s = m.get_server() >>> s.serve_forever()
Objetos proxy
Un proxy es un objeto que se refiere a un objeto compartido que vive (presumiblemente) en un proceso diferente. Se dice que el objeto compartido es el referente del proxy. Varios objetos proxy pueden tener el mismo referente.
Un objeto proxy tiene métodos que invocan los métodos correspondientes de su referente (aunque no todos los métodos del referente estarán necesariamente disponibles a través del proxy). De esta forma, un proxy se puede utilizar como su referente:
>>> from multiprocessing import Manager >>> manager = Manager() >>> l = manager.list([i*i for i in range(10)]) >>> print(l) [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] >>> print(repr(l)) <ListProxy object, typeid 'list' at 0x...> >>> l[4] 16 >>> l[2:5] [4, 9, 16]
Tenga en cuenta que aplicando str()
a un proxy devolverá la representación del referente, mientras que la aplicación repr()
devolverá la representación del apoderado.
Una característica importante de los objetos proxy es que se pueden seleccionar para que puedan pasarse entre procesos. Como tal, un referente puede contener Objetos proxy. Esto permite el anidamiento de estas listas administradas, dictados y otros Objetos proxy:
>>> a = manager.list() >>> b = manager.list() >>> a.append(b) # referent of a now contains referent of b >>> print(a, b) [<ListProxy object, typeid 'list' at ...>] [] >>> b.append('hello') >>> print(a[0], b) ['hello'] ['hello']
De manera similar, los proxies dict y list pueden estar anidados uno dentro del otro:
>>> l_outer = manager.list([ manager.dict() for i in range(2) ]) >>> d_first_inner = l_outer[0] >>> d_first_inner['a'] = 1 >>> d_first_inner['b'] = 2 >>> l_outer[1]['c'] = 3 >>> l_outer[1]['z'] = 26 >>> print(l_outer[0]) {'a': 1, 'b': 2} >>> print(l_outer[1]) {'c': 3, 'z': 26}
Si es estándar (no proxy) list
o dict
los objetos están contenidos en un referente, las modificaciones a esos valores mutables no se propagarán a través del administrador porque el proxy no tiene forma de saber cuándo se modifican los valores contenidos dentro. Sin embargo, almacenar un valor en un proxy de contenedor (que desencadena una __setitem__
en el objeto proxy) se propaga a través del administrador y, por lo tanto, para modificar efectivamente dicho elemento, se podría reasignar el valor modificado al proxy contenedor:
# create a list proxy and append a mutable object (a dictionary) lproxy = manager.list() lproxy.append({}) # now mutate the dictionary d = lproxy[0] d['a'] = 1 d['b'] = 2 # at this point, the changes to d are not yet synced, but by # updating the dictionary, the proxy is notified of the change lproxy[0] = d
Este enfoque es quizás menos conveniente que emplear anidados Objetos proxy para la mayoría de los casos de uso, pero también demuestra un nivel de control sobre la sincronización.
Nota
El proxy escribe multiprocessing
No haga nada para respaldar las comparaciones por valor. Entonces, por ejemplo, tenemos:
>>> manager.list([1,2,3]) == [1,2,3] False
En su lugar, se debe usar una copia del referente al hacer comparaciones.
-
class multiprocessing.managers.BaseProxy
-
Los objetos proxy son instancias de subclases de
BaseProxy
.-
_callmethod(methodname[, args[, kwds]])
-
Llame y devuelva el resultado de un método del referente del proxy.
Si
proxy
es un proxy cuyo referente esobj
luego la expresiónproxy._callmethod(methodname, args, kwds)
evaluará la expresión
getattr(obj, methodname)(*args, **kwds)
en el proceso del gerente.
El valor devuelto será una copia del resultado de la llamada o un proxy a un nuevo objeto compartido; consulte la documentación del method_to_typeid argumento de
BaseManager.register()
.Si la llamada genera una excepción, la vuelve a generar
_callmethod()
. Si se genera alguna otra excepción en el proceso del gerente, esto se convierte en unaRemoteError
excepción y es planteado por_callmethod()
.Tenga en cuenta en particular que se generará una excepción si nombre del método no ha sido expuesto.
Un ejemplo del uso de
_callmethod()
:>>> l = manager.list(range(10)) >>> l._callmethod('__len__') 10 >>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent to l[2:7] [2, 3, 4, 5, 6] >>> l._callmethod('__getitem__', (20,)) # equivalent to l[20] Traceback (most recent call last): ... IndexError: list index out of range
-
_getvalue()
-
Devuelva una copia del referente.
Si el referente no se puede seleccionar, se generará una excepción.
-
__repr__()
-
Devuelve una representación del objeto proxy.
-
__str__()
-
Devuelve la representación del referente.
-
Limpiar
Un objeto proxy usa una devolución de llamada débil de modo que cuando se recolecta la basura, se anula el registro del administrador que posee su referente.
Un objeto compartido se elimina del proceso de administrador cuando ya no hay proxies que se refieran a él.
Grupos de procesos
Se puede crear un conjunto de procesos que llevarán a cabo las tareas que se le envíen con el Pool
clase.
-
class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
-
Objeto de grupo de procesos que controla un grupo de procesos de trabajo a los que se pueden enviar trabajos. Admite resultados asincrónicos con tiempos de espera y devoluciones de llamada y tiene una implementación de mapa paralela.
procesos es el número de procesos de trabajo que se utilizarán. Si procesos es
None
luego el número devuelto poros.cpu_count()
se utiliza.Si inicializador no es
None
luego cada proceso de trabajador llamaráinitializer(*initargs)
cuando comienza.maxtasksperchild es la cantidad de tareas que un proceso de trabajo puede completar antes de salir y ser reemplazado por un proceso de trabajo nuevo, para permitir que se liberen los recursos no utilizados. El valor por defecto maxtasksperchild es
None
, lo que significa que los procesos de trabajo vivirán tanto como el grupo.contexto se puede utilizar para especificar el contexto utilizado para iniciar los procesos de trabajo. Por lo general, un grupo se crea usando la función
multiprocessing.Pool()
o laPool()
método de un objeto de contexto. En ambos casos contexto está configurado correctamente.Tenga en cuenta que los métodos del objeto de grupo solo deben ser invocados por el proceso que creó el grupo.
Advertencia
multiprocessing.pool
los objetos tienen recursos internos que deben administrarse correctamente (como cualquier otro recurso) mediante el uso del grupo como administrador de contexto o llamandoclose()
yterminate()
a mano. No hacer esto puede hacer que el proceso se quede pendiente de la finalización.Tenga en cuenta que es incorrecto confiar en el recolector de basura para destruir el grupo, ya que CPython no asegura que se llamará al finalizador del grupo (consulte
object.__del__()
para más información).Nuevo en la versión 3.2: maxtasksperchild
Nuevo en la versión 3.4: contexto
Nota
Procesos de trabajo dentro de un
Pool
normalmente viven durante la duración completa de la cola de trabajo de la agrupación. Un patrón frecuente que se encuentra en otros sistemas (como Apache, mod_wsgi, etc.) para liberar recursos en poder de los trabajadores es permitir que un trabajador dentro de un grupo complete solo una cantidad determinada de trabajo antes de salir, limpiarse y generar un nuevo proceso. para reemplazar el anterior. los maxtasksperchild argumento a laPool
expone esta capacidad al usuario final.-
apply(func[, args[, kwds]])
-
Llama func con argumentos argumentos y argumentos de palabras clave kwds. Bloquea hasta que el resultado está listo. Dados estos bloques,
apply_async()
es más adecuado para realizar trabajos en paralelo. Adicionalmente, func solo se ejecuta en uno de los trabajadores de la piscina.
-
apply_async(func[, args[, kwds[, callback[, error_callback]]]])
-
Una variante del
apply()
método que devuelve unAsyncResult
objeto.Si llamar de vuelta se especifica, entonces debería ser un invocable que acepte un solo argumento. Cuando el resultado esté listo llamar de vuelta se le aplica, es decir, a menos que la llamada haya fallado, en cuyo caso el error_callback se aplica en su lugar.
Si error_callback se especifica, entonces debería ser un invocable que acepte un solo argumento. Si la función de destino falla, entonces la error_callback se llama con la instancia de excepción.
Las devoluciones de llamada deben completarse de inmediato, ya que de lo contrario se bloqueará el hilo que maneja los resultados.
-
map(func, iterable[, chunksize])
-
Un equivalente paralelo de la
map()
función incorporada (solo admite una iterable sin embargo, para varios iterables, consultestarmap()
). Bloquea hasta que el resultado está listo.Este método divide el iterable en varios fragmentos que envía al grupo de procesos como tareas separadas. El tamaño (aproximado) de estos trozos se puede especificar configurando tamaño de porción a un entero positivo.
Tenga en cuenta que puede causar un alto uso de memoria para iterables muy largos. Considere usar
imap()
oimap_unordered()
con explícito tamaño de porción opción para una mejor eficiencia.
-
map_async(func, iterable[, chunksize[, callback[, error_callback]]])
-
Una variante del
map()
método que devuelve unAsyncResult
objeto.Si llamar de vuelta se especifica, entonces debería ser un invocable que acepte un solo argumento. Cuando el resultado esté listo llamar de vuelta se le aplica, es decir, a menos que la llamada haya fallado, en cuyo caso el error_callback se aplica en su lugar.
Si error_callback se especifica, entonces debería ser un invocable que acepte un solo argumento. Si la función de destino falla, entonces la error_callback se llama con la instancia de excepción.
Las devoluciones de llamada deben completarse de inmediato, ya que de lo contrario se bloqueará el hilo que maneja los resultados.
-
imap(func, iterable[, chunksize])
-
Una versión más perezosa de
map()
.los tamaño de porción argumento es el mismo que el utilizado por el
map()
método. Para iterables muy largos usando un gran valor para tamaño de porción puede completar el trabajo mucho más rápido que usar el valor predeterminado de1
.También si tamaño de porción es
1
entonces elnext()
método del iterador devuelto por elimap()
El método tiene una opción se acabó el tiempo parámetro:next(timeout)
elevarámultiprocessing.TimeoutError
si el resultado no se puede devolver dentro de se acabó el tiempo segundos.
-
imap_unordered(func, iterable[, chunksize])
-
Lo mismo que
imap()
excepto que el orden de los resultados del iterador devuelto debe considerarse arbitrario. (Solo cuando hay un solo proceso de trabajo se garantiza que el pedido es “correcto”).
-
starmap(func, iterable[, chunksize])
-
Igual que
map()
excepto que los elementos del iterable se espera que sean iterables que se descomprimen como argumentos.Por lo tanto, un iterable de
[(1,2), (3, 4)]
resultados en[func(1,2),
.
func(3,4)]Nuevo en la versión 3.3.
-
starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])
-
Una combinación de
starmap()
ymap_async()
que itera sobre iterable de iterables y llamadas func con los iterables desempaquetados. Devuelve un objeto de resultado.Nuevo en la versión 3.3.
-
close()
-
Evita que se envíen más tareas al grupo. Una vez que se hayan completado todas las tareas, los procesos de trabajo se cerrarán.
-
terminate()
-
Detiene los procesos del trabajador inmediatamente sin completar el trabajo pendiente. Cuando el objeto de la piscina se recolecta como basura
terminate()
será llamado inmediatamente.
-
join()
-
Espere a que salgan los procesos de trabajo. Hay que llamar
close()
oterminate()
antes de usarjoin()
.
Nuevo en la versión 3.3: Los objetos de grupo ahora admiten el protocolo de gestión de contexto; consulte Tipos de administrador de contexto.
__enter__()
devuelve el objeto de la piscina, y__exit__()
llamadasterminate()
. -
-
class multiprocessing.pool.AsyncResult
-
La clase del resultado devuelto por
Pool.apply_async()
yPool.map_async()
.-
get([timeout])
-
Devuelve el resultado cuando llegue. Si se acabó el tiempo no es
None
y el resultado no llega dentro se acabó el tiempo segundos entoncesmultiprocessing.TimeoutError
es elevado. Si la llamada remota generó una excepción, entonces esa excepción será levantada porget()
.
-
wait([timeout])
-
Espere hasta que el resultado esté disponible o hasta que se acabó el tiempo Pasan los segundos.
-
ready()
-
Devuelve si la llamada se ha completado.
-
successful()
-
Devuelve si la llamada se completó sin generar una excepción. Elevará
ValueError
si el resultado no está listo.Modificado en la versión 3.7: Si el resultado no está listo,
ValueError
se levanta en lugar deAssertionError
.
-
El siguiente ejemplo demuestra el uso de una piscina:
from multiprocessing import Pool import time def f(x): return x*x if __name__ == '__main__': with Pool(processes=4) as pool: # start 4 worker processes result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]" it = pool.imap(f, range(10)) print(next(it)) # prints "0" print(next(it)) # prints "1" print(it.next(timeout=1)) # prints "4" unless your computer is *very* slow result = pool.apply_async(time.sleep, (10,)) print(result.get(timeout=1)) # raises multiprocessing.TimeoutError
Oyentes y Clientes
Por lo general, el paso de mensajes entre procesos se realiza usando colas o usando Connection
objetos devueltos por Pipe()
.
sin embargo, el multiprocessing.connection
El módulo permite cierta flexibilidad adicional. Básicamente, proporciona una API orientada a mensajes de alto nivel para tratar con sockets o canalizaciones con nombre de Windows. También tiene soporte para autenticación implícita utilizando el hmac
módulo, y para sondear múltiples conexiones al mismo tiempo.
-
multiprocessing.connection.deliver_challenge(connection, authkey)
-
Envíe un mensaje generado aleatoriamente al otro extremo de la conexión y espere una respuesta.
Si la respuesta coincide con el resumen del mensaje usando Clave de autenticación como clave, se envía un mensaje de bienvenida al otro extremo de la conexión. De lo contrario
AuthenticationError
es elevado.
-
multiprocessing.connection.answer_challenge(connection, authkey)
-
Reciba un mensaje, calcule el resumen del mensaje usando Clave de autenticación como clave y, a continuación, envíe el resumen.
Si no se recibe un mensaje de bienvenida, entonces
AuthenticationError
es elevado.
-
multiprocessing.connection.Client(address[, family[, authkey]])
-
Intente establecer una conexión con el oyente que está usando la dirección Dirección, devolviendo un
Connection
.El tipo de conexión está determinado por familia argumento, pero esto generalmente se puede omitir ya que generalmente se puede inferir del formato de Dirección. (Ver Formatos de dirección)
Si Clave de autenticación se proporciona y no None, debe ser una cadena de bytes y se utilizará como clave secreta para un desafío de autenticación basado en HMAC. No se realiza ninguna autenticación si Clave de autenticación es Ninguno.
AuthenticationError
se genera si falla la autenticación. Ver Claves de autenticación.
-
class multiprocessing.connection.Listener([address[, family[, backlog[, authkey]]]])
-
Un contenedor para un socket enlazado o una tubería con nombre de Windows que está ‘escuchando’ conexiones.
Dirección es la dirección que utilizará el conector vinculado o la tubería con nombre del objeto de escucha.
Nota
Si se utiliza una dirección de ‘0.0.0.0’, la dirección no será un punto final conectable en Windows. Si necesita un punto final conectable, debe usar ‘127.0.0.1’.
familia es el tipo de conector (o tubería con nombre) que se utilizará. Esta puede ser una de las cadenas
'AF_INET'
(para un socket TCP),'AF_UNIX'
(para un socket de dominio Unix) o'AF_PIPE'
(para una tubería con nombre de Windows). De estos, solo se garantiza que el primero estará disponible. Si familia esNone
entonces la familia se infiere del formato de Dirección. Si Dirección es tambiénNone
entonces se elige un valor predeterminado. Este valor predeterminado es la familia que se supone que es la más rápida disponible. Ver Formatos de dirección. Tenga en cuenta que si familia es'AF_UNIX'
y la dirección esNone
entonces el socket se creará en un directorio temporal privado creado usandotempfile.mkstemp()
.Si el objeto de escucha usa un socket, entonces reserva (1 por defecto) se pasa al
listen()
método del zócalo una vez que se ha unido.Si Clave de autenticación se proporciona y no None, debe ser una cadena de bytes y se utilizará como clave secreta para un desafío de autenticación basado en HMAC. No se realiza ninguna autenticación si Clave de autenticación es Ninguno.
AuthenticationError
se genera si falla la autenticación. Ver Claves de autenticación.-
accept()
-
Acepte una conexión en el socket vinculado o la tubería con nombre del objeto de escucha y devuelva un
Connection
objeto. Si se intenta la autenticación y falla, entoncesAuthenticationError
es elevado.
-
close()
-
Cierre el conector vinculado o la tubería con nombre del objeto de escucha. Esto se llama automáticamente cuando el oyente es recolectado como basura. Sin embargo, es aconsejable llamarlo explícitamente.
Los objetos de escucha tienen las siguientes propiedades de solo lectura:
-
address
-
La dirección que está utilizando el objeto Listener.
-
last_accepted
-
La dirección de la que provino la última conexión aceptada. Si esto no está disponible, entonces es
None
.
Nuevo en la versión 3.3: Los objetos de escucha ahora admiten el protocolo de gestión de contexto; consulte Tipos de administrador de contexto.
__enter__()
devuelve el objeto de escucha y__exit__()
llamadasclose()
. -
-
multiprocessing.connection.wait(object_list, timeout=None)
-
Espere hasta que entre un objeto lista_objetos está listo. Devuelve la lista de esos objetos en lista_objetos que están listos. Si se acabó el tiempo es un flotador, entonces la llamada se bloquea durante como máximo esa cantidad de segundos. Si se acabó el tiempo es
None
luego se bloqueará por un período ilimitado. Un tiempo de espera negativo equivale a un tiempo de espera cero.Tanto para Unix como para Windows, un objeto puede aparecer en lista_objetos si esto es
- un legible
Connection
objeto; - un conectado y legible
socket.socket
objeto; o - los
sentinel
atributo de unProcess
objeto.
Una conexión o un objeto de socket está listo cuando hay datos disponibles para leer de él, o el otro extremo se ha cerrado.
Unix:
wait(object_list, timeout)
casi equivalenteselect.select(object_list, [], [], timeout)
. La diferencia es que, siselect.select()
es interrumpido por una señal, puede aumentarOSError
con un número de error deEINTR
, mientras quewait()
no.Ventanas: Un elemento en lista_objetos debe ser un identificador entero que sea de espera (de acuerdo con la definición utilizada por la documentación de la función Win32
WaitForMultipleObjects()
) o puede ser un objeto con unfileno()
método que devuelve un identificador de enchufe o un identificador de tubería. (Tenga en cuenta que las manijas de las tuberías y las manijas no asas de espera.)Nuevo en la versión 3.3.
- un legible
Ejemplos de
El siguiente código de servidor crea un oyente que usa 'secret password'
como clave de autenticación. Luego espera una conexión y envía algunos datos al cliente:
from multiprocessing.connection import Listener from array import array address = ('localhost', 6000) # family is deduced to be 'AF_INET' with Listener(address, authkey=b'secret password') as listener: with listener.accept() as conn: print('connection accepted from', listener.last_accepted) conn.send([2.25, None, 'junk', float]) conn.send_bytes(b'hello') conn.send_bytes(array('i', [42, 1729]))
El siguiente código se conecta al servidor y recibe algunos datos del servidor:
from multiprocessing.connection import Client from array import array address = ('localhost', 6000) with Client(address, authkey=b'secret password') as conn: print(conn.recv()) # => [2.25, None, 'junk', float] print(conn.recv_bytes()) # => 'hello' arr = array('i', [0, 0, 0, 0, 0]) print(conn.recv_bytes_into(arr)) # => 8 print(arr) # => array('i', [42, 1729, 0, 0, 0])
El siguiente código usa wait()
para esperar mensajes de varios procesos a la vez:
import time, random from multiprocessing import Process, Pipe, current_process from multiprocessing.connection import wait def foo(w): for i in range(10): w.send((i, current_process().name)) w.close() if __name__ == '__main__': readers = [] for i in range(4): r, w = Pipe(duplex=False) readers.append(r) p = Process(target=foo, args=(w,)) p.start() # We close the writable end of the pipe now to be sure that # p is the only process which owns a handle for it. This # ensures that when p closes its handle for the writable end, # wait() will promptly report the readable end as being ready. w.close() while readers: for r in wait(readers): try: msg = r.recv() except EOFError: readers.remove(r) else: print(msg)
Formatos de dirección
- Un
'AF_INET'
dirección es una tupla del formulario(hostname, port)
dónde nombre de host es una cuerda y Puerto es un entero. - Un
'AF_UNIX'
dirección es una cadena que representa un nombre de archivo en el sistema de archivos. - Un
'AF_PIPE'
dirección es una cadena de la formar'.pipe{PipeName}'
. UsarClient()
para conectarse a una tubería con nombre en una computadora remota llamada Nombre del servidor uno debe usar una dirección del formularior'ServerNamepipe{PipeName}'
en lugar de.
Tenga en cuenta que cualquier cadena que comience con dos barras diagonales inversas se asume de forma predeterminada como una 'AF_PIPE'
dirección en lugar de una 'AF_UNIX'
Dirección.
Claves de autenticación
Cuando uno usa Connection.recv
, los datos recibidos se eliminan automáticamente. Desafortunadamente, eliminar datos de una fuente que no es de confianza es un riesgo para la seguridad. Por lo tanto Listener
y Client()
utilizar el hmac
módulo para proporcionar autenticación implícita.
Una clave de autenticación es una cadena de bytes que se puede considerar como una contraseña: una vez que se establece una conexión, ambos extremos exigirán una prueba de que el otro conoce la clave de autenticación. (Demostrar que ambos extremos usan la misma clave no no implica enviar la clave a través de la conexión).
Si se solicita autenticación pero no se especifica una clave de autenticación, el valor de retorno de current_process().authkey
se utiliza (ver Process
). Este valor será heredado automáticamente por cualquier Process
objeto que crea el proceso actual. Esto significa que (de forma predeterminada) todos los procesos de un programa multiproceso compartirán una única clave de autenticación que se puede utilizar al configurar conexiones entre ellos.
También se pueden generar claves de autenticación adecuadas utilizando os.urandom()
.
Inicio sesión
Está disponible algún soporte para el registro. Sin embargo, tenga en cuenta que el logging
El paquete no utiliza bloqueos compartidos de procesos, por lo que es posible (según el tipo de controlador) que se mezclen los mensajes de diferentes procesos.
-
multiprocessing.get_logger()
-
Devuelve el registrador utilizado por
multiprocessing
. Si es necesario, se creará uno nuevo.Cuando se crea por primera vez, el registrador tiene nivel
logging.NOTSET
y ningún controlador predeterminado. Los mensajes enviados a este registrador no se propagarán por defecto al registrador raíz.Tenga en cuenta que en los procesos secundarios de Windows solo heredarán el nivel del registrador del proceso principal; no se heredará ninguna otra personalización del registrador.
-
multiprocessing.log_to_stderr()
-
Esta función realiza una llamada a
get_logger()
pero además de devolver el registrador creado por get_logger, agrega un controlador que envía la salida asys.stderr
usando formato'[%(levelname)s/%(processName)s] %(message)s'
.
A continuación se muestra una sesión de ejemplo con el registro activado:
>>> import multiprocessing, logging >>> logger = multiprocessing.log_to_stderr() >>> logger.setLevel(logging.INFO) >>> logger.warning('doomed') [WARNING/MainProcess] doomed >>> m = multiprocessing.Manager() [INFO/SyncManager-...] child process calling self.run() [INFO/SyncManager-...] created temp directory /.../pymp-... [INFO/SyncManager-...] manager serving at '/.../listener-...' >>> del m [INFO/MainProcess] sending shutdown message to manager [INFO/SyncManager-...] manager exiting with exitcode 0
Para obtener una tabla completa de los niveles de registro, consulte la logging
módulo.
los multiprocessing.dummy
módulo
multiprocessing.dummy
replica la API de multiprocessing
pero no es más que una envoltura alrededor del threading
módulo.
En particular, el Pool
función proporcionada por multiprocessing.dummy
devuelve una instancia de ThreadPool
, que es una subclase de Pool
que admite todas las mismas llamadas a métodos, pero utiliza un grupo de subprocesos de trabajo en lugar de procesos de trabajo.
-
class multiprocessing.pool.ThreadPool([processes[, initializer[, initargs]]])
-
Un objeto de grupo de subprocesos que controla un grupo de subprocesos de trabajo a los que se pueden enviar trabajos.
ThreadPool
las instancias son totalmente compatibles con la interfazPool
instancias, y sus recursos también deben administrarse correctamente, ya sea utilizando el grupo como administrador de contexto o llamandoclose()
yterminate()
a mano.procesos es el número de subprocesos de trabajo que se utilizarán. Si procesos es
None
luego el número devuelto poros.cpu_count()
se utiliza.Si inicializador no es
None
luego cada proceso de trabajador llamaráinitializer(*initargs)
cuando comienza.diferente a
Pool
, maxtasksperchild y contexto no se puede proporcionar.Nota
A
ThreadPool
comparte la misma interfaz quePool
, que está diseñado en torno a un conjunto de procesos y es anterior a la introducción de laconcurrent.futures
módulo. Como tal, hereda algunas operaciones que no tienen sentido para un grupo respaldado por subprocesos y tiene su propio tipo para representar el estado de los trabajos asincrónicos.AsyncResult
, eso no es entendido por ninguna otra biblioteca.Los usuarios generalmente deberían preferir usar
concurrent.futures.ThreadPoolExecutor
, que tiene una interfaz más simple que se diseñó en torno a subprocesos desde el principio y que devuelveconcurrent.futures.Future
instancias que son compatibles con muchas otras bibliotecas, incluidasasyncio
.
Pautas de programación
Hay ciertas pautas y modismos que deben seguirse al usar multiprocessing
.
Todos los métodos de inicio
Lo siguiente se aplica a todos los métodos de inicio.
Evite el estado compartido
En la medida de lo posible, se debe intentar evitar la transferencia de grandes cantidades de datos entre procesos.
Probablemente sea mejor ceñirse al uso de colas o conductos para la comunicación entre procesos en lugar de usar las primitivas de sincronización de nivel inferior.
Picklability
Asegúrese de que los argumentos de los métodos de los proxies se puedan seleccionar.
Seguridad de subprocesos de proxies
No utilice un objeto proxy de más de un hilo a menos que lo proteja con un candado.
(Nunca hay un problema con los diferentes procesos que utilizan el mismo apoderado.)
Unirse a procesos zombies
En Unix, cuando un proceso finaliza pero no se ha unido, se convierte en un zombi. Nunca debería haber muchos porque cada vez que comienza un nuevo proceso (o active_children()
se llama) se unirán todos los procesos completados que aún no se hayan unido. También llamando a un proceso terminado Process.is_alive
se unirá al proceso. Aun así, probablemente sea una buena práctica unirse explícitamente a todos los procesos que inicie.
Mejor heredar que encurtir / despegar
Al usar el Aparecer o tenedor métodos de inicio de muchos tipos desde multiprocessing
deben ser decapables para que los procesos secundarios puedan usarlos. Sin embargo, generalmente se debe evitar enviar objetos compartidos a otros procesos utilizando canalizaciones o colas. En su lugar, debe organizar el programa de modo que un proceso que necesita acceso a un recurso compartido creado en otro lugar pueda heredarlo de un proceso anterior.
Evite la terminación de procesos
Utilizando el Process.terminate
El método para detener un proceso puede provocar que cualquier recurso compartido (como bloqueos, semáforos, conductos y colas) que esté utilizando actualmente el proceso se rompa o no esté disponible para otros procesos.
Por lo tanto, probablemente sea mejor considerar solo el uso Process.terminate
en procesos que nunca utilizan recursos compartidos.
Unirse a procesos que usan colas
Tenga en cuenta que un proceso que ha puesto elementos en una cola esperará antes de terminar hasta que todos los elementos almacenados en búfer sean alimentados por el subproceso “alimentador” a la tubería subyacente. (El proceso hijo puede llamar al Queue.cancel_join_thread
método de la cola para evitar este comportamiento).
Esto significa que cada vez que use una cola, debe asegurarse de que todos los elementos que se han colocado en la cola se eliminarán eventualmente antes de unirse al proceso. De lo contrario, no puede estar seguro de que terminarán los procesos que han puesto elementos en la cola. Recuerde también que los procesos no demoníacos se unirán automáticamente.
Un ejemplo que se interbloqueará es el siguiente:
from multiprocessing import Process, Queue def f(q): q.put('X' * 1000000) if __name__ == '__main__': queue = Queue() p = Process(target=f, args=(queue,)) p.start() p.join() # this deadlocks obj = queue.get()
Una solución aquí sería intercambiar las dos últimas líneas (o simplemente eliminar el p.join()
línea).
Pasar recursos explícitamente a procesos secundarios
En Unix usando el tenedor método de inicio, un proceso hijo puede hacer uso de un recurso compartido creado en un proceso padre utilizando un recurso global. Sin embargo, es mejor pasar el objeto como argumento al constructor del proceso hijo.
Además de hacer que el código sea (potencialmente) compatible con Windows y los otros métodos de inicio, esto también asegura que mientras el proceso hijo todavía esté vivo, el objeto no será recolectado como basura en el proceso padre. Esto puede ser importante si se libera algún recurso cuando el objeto se recolecta como basura en el proceso principal.
Entonces, por ejemplo
from multiprocessing import Process, Lock def f(): ... do something using "lock" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f).start()
debería ser reescrito como
from multiprocessing import Process, Lock def f(l): ... do something using "l" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f, args=(lock,)).start()
Cuidado con reemplazar sys.stdin
con un “archivo como objeto”
multiprocessing
originalmente llamado incondicionalmente:
os.close(sys.stdin.fileno())
en el multiprocessing.Process._bootstrap()
método: esto dio lugar a problemas con los procesos en proceso. Esto se ha cambiado a:
sys.stdin.close() sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)
Lo que resuelve el problema fundamental de los procesos que chocan entre sí, lo que da como resultado un error de descriptor de archivo incorrecto, pero presenta un peligro potencial para las aplicaciones que reemplazan sys.stdin()
con un “objeto similar a un archivo” con almacenamiento en búfer de salida. Este peligro es que si varios procesos llaman close()
en este objeto similar a un archivo, podría dar como resultado que los mismos datos se viertan al objeto varias veces, lo que da como resultado la corrupción.
Si escribe un objeto similar a un archivo e implementa su propio almacenamiento en caché, puede hacerlo seguro para la bifurcación almacenando el pid cada vez que lo agrega al caché y descartando el caché cuando cambia el pid. Por ejemplo:
@property def cache(self): pid = os.getpid() if pid != self._pid: self._pid = pid self._cache = [] return self._cache
Para más información, ver bpo-5155, bpo-5313 y bpo-5331
los Aparecer y tenedor métodos de inicio
Hay algunas restricciones adicionales que no se aplican a la tenedor método de inicio.
Más picklability
Asegúrese de que todos los argumentos Process.__init__()
son encurtidos. Además, si subclase Process
a continuación, asegúrese de que las instancias sean seleccionables cuando el Process.start
se llama al método.
Variables globales
Tenga en cuenta que si el código que se ejecuta en un proceso secundario intenta acceder a una variable global, entonces el valor que ve (si lo hay) puede no ser el mismo que el valor en el proceso principal en el momento en que Process.start
fue llamado.
Sin embargo, las variables globales que son solo constantes de nivel de módulo no causan problemas.
Importación segura del módulo principal
Asegúrese de que el módulo principal pueda ser importado de forma segura por un nuevo intérprete de Python sin causar efectos secundarios no deseados (como iniciar un nuevo proceso).
Por ejemplo, usando el Aparecer o tenedor El método de inicio que ejecuta el siguiente módulo fallaría con un RuntimeError
:
from multiprocessing import Process def foo(): print('hello') p = Process(target=foo) p.start()
En su lugar, uno debería proteger el “punto de entrada” del programa utilizando if
como sigue:
__name__ == '__main__':
from multiprocessing import Process, freeze_support, set_start_method def foo(): print('hello') if __name__ == '__main__': freeze_support() set_start_method('spawn') p = Process(target=foo) p.start()
(Los freeze_support()
La línea se puede omitir si el programa se ejecutará normalmente en lugar de congelarse).
Esto permite que el intérprete de Python recién generado importe de forma segura el módulo y luego ejecute el foo()
función.
Se aplican restricciones similares si se crea un grupo o administrador en el módulo principal.
Ejemplos de
Demostración de cómo crear y utilizar administradores y proxies personalizados:
from multiprocessing import freeze_support from multiprocessing.managers import BaseManager, BaseProxy import operator ## class Foo: def f(self): print('you called Foo.f()') def g(self): print('you called Foo.g()') def _h(self): print('you called Foo._h()') # A simple generator function def baz(): for i in range(10): yield i*i # Proxy type for generator objects class GeneratorProxy(BaseProxy): _exposed_ = ['__next__'] def __iter__(self): return self def __next__(self): return self._callmethod('__next__') # Function to return the operator module def get_operator_module(): return operator ## class MyManager(BaseManager): pass # register the Foo class; make `f()` and `g()` accessible via proxy MyManager.register('Foo1', Foo) # register the Foo class; make `g()` and `_h()` accessible via proxy MyManager.register('Foo2', Foo, exposed=('g', '_h')) # register the generator function baz; use `GeneratorProxy` to make proxies MyManager.register('baz', baz, proxytype=GeneratorProxy) # register get_operator_module(); make public functions accessible via proxy MyManager.register('operator', get_operator_module) ## def test(): manager = MyManager() manager.start() print('-' * 20) f1 = manager.Foo1() f1.f() f1.g() assert not hasattr(f1, '_h') assert sorted(f1._exposed_) == sorted(['f', 'g']) print('-' * 20) f2 = manager.Foo2() f2.g() f2._h() assert not hasattr(f2, 'f') assert sorted(f2._exposed_) == sorted(['g', '_h']) print('-' * 20) it = manager.baz() for i in it: print('<%d>' % i, end=' ') print() print('-' * 20) op = manager.operator() print('op.add(23, 45) =', op.add(23, 45)) print('op.pow(2, 94) =', op.pow(2, 94)) print('op._exposed_ =', op._exposed_) ## if __name__ == '__main__': freeze_support() test()
Utilizando Pool
:
import multiprocessing import time import random import sys # # Functions used by test code # def calculate(func, args): result = func(*args) return '%s says that %s%s = %s' % ( multiprocessing.current_process().name, func.__name__, args, result ) def calculatestar(args): return calculate(*args) def mul(a, b): time.sleep(0.5 * random.random()) return a * b def plus(a, b): time.sleep(0.5 * random.random()) return a + b def f(x): return 1.0 / (x - 5.0) def pow3(x): return x ** 3 def noop(x): pass # # Test code # def test(): PROCESSES = 4 print('Creating pool with %d processesn' % PROCESSES) with multiprocessing.Pool(PROCESSES) as pool: # # Tests # TASKS = [(mul, (i, 7)) for i in range(10)] + [(plus, (i, 8)) for i in range(10)] results = [pool.apply_async(calculate, t) for t in TASKS] imap_it = pool.imap(calculatestar, TASKS) imap_unordered_it = pool.imap_unordered(calculatestar, TASKS) print('Ordered results using pool.apply_async():') for r in results: print('t', r.get()) print() print('Ordered results using pool.imap():') for x in imap_it: print('t', x) print() print('Unordered results using pool.imap_unordered():') for x in imap_unordered_it: print('t', x) print() print('Ordered results using pool.map() --- will block till complete:') for x in pool.map(calculatestar, TASKS): print('t', x) print() # # Test error handling # print('Testing error handling:') try: print(pool.apply(f, (5,))) except ZeroDivisionError: print('tGot ZeroDivisionError as expected from pool.apply()') else: raise AssertionError('expected ZeroDivisionError') try: print(pool.map(f, list(range(10)))) except ZeroDivisionError: print('tGot ZeroDivisionError as expected from pool.map()') else: raise AssertionError('expected ZeroDivisionError') try: print(list(pool.imap(f, list(range(10))))) except ZeroDivisionError: print('tGot ZeroDivisionError as expected from list(pool.imap())') else: raise AssertionError('expected ZeroDivisionError') it = pool.imap(f, list(range(10))) for i in range(10): try: x = next(it) except ZeroDivisionError: if i == 5: pass except StopIteration: break else: if i == 5: raise AssertionError('expected ZeroDivisionError') assert i == 9 print('tGot ZeroDivisionError as expected from IMapIterator.next()') print() # # Testing timeouts # print('Testing ApplyResult.get() with timeout:', end=' ') res = pool.apply_async(calculate, TASKS[0]) while 1: sys.stdout.flush() try: sys.stdout.write('nt%s' % res.get(0.02)) break except multiprocessing.TimeoutError: sys.stdout.write('.') print() print() print('Testing IMapIterator.next() with timeout:', end=' ') it = pool.imap(calculatestar, TASKS) while 1: sys.stdout.flush() try: sys.stdout.write('nt%s' % it.next(0.02)) except StopIteration: break except multiprocessing.TimeoutError: sys.stdout.write('.') print() print() if __name__ == '__main__': multiprocessing.freeze_support() test()
Un ejemplo que muestra cómo usar colas para alimentar tareas a una colección de procesos de trabajo y recopilar los resultados:
import time import random from multiprocessing import Process, Queue, current_process, freeze_support # # Function run by worker processes # def worker(input, output): for func, args in iter(input.get, 'STOP'): result = calculate(func, args) output.put(result) # # Function used to calculate result # def calculate(func, args): result = func(*args) return '%s says that %s%s = %s' % (current_process().name, func.__name__, args, result) # # Functions referenced by tasks # def mul(a, b): time.sleep(0.5*random.random()) return a * b def plus(a, b): time.sleep(0.5*random.random()) return a + b # # # def test(): NUMBER_OF_PROCESSES = 4 TASKS1 = [(mul, (i, 7)) for i in range(20)] TASKS2 = [(plus, (i, 8)) for i in range(10)] # Create queues task_queue = Queue() done_queue = Queue() # Submit tasks for task in TASKS1: task_queue.put(task) # Start worker processes for i in range(NUMBER_OF_PROCESSES): Process(target=worker, args=(task_queue, done_queue)).start() # Get and print results print('Unordered results:') for i in range(len(TASKS1)): print('t', done_queue.get()) # Add more tasks using `put()` for task in TASKS2: task_queue.put(task) # Get and print some more results for i in range(len(TASKS2)): print('t', done_queue.get()) # Tell child processes to stop for i in range(NUMBER_OF_PROCESSES): task_queue.put('STOP') if __name__ == '__main__': freeze_support() test()