Código fuente: Lib / multiprocesamiento /

Introducción

multiprocessing es un paquete que admite procesos de generación mediante una API similar a la threading módulo. los multiprocessing El paquete ofrece simultaneidad local y remota, evitando de manera efectiva el Bloqueo de intérprete global mediante el uso de subprocesos en lugar de subprocesos. Debido a esto, el multiprocessing permite al programador aprovechar completamente varios procesadores en una máquina determinada. Funciona tanto en Unix como en Windows.

los multiprocessing El módulo también presenta API que no tienen análogos en el threading módulo. Un buen ejemplo de esto es el Pool objeto que ofrece un medio conveniente para paralelizar la ejecución de una función a través de múltiples valores de entrada, distribuyendo los datos de entrada entre procesos (paralelismo de datos). El siguiente ejemplo demuestra la práctica común de definir tales funciones en un módulo para que los procesos secundarios puedan importar ese módulo con éxito. Este ejemplo básico de paralelismo de datos usando Pool,

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))

imprimirá en salida estándar

[1, 4, 9]

los Process clase

En multiprocessing, los procesos se generan creando un Process objeto y luego llamar a su start() método. Process sigue la API de threading.Thread. Un ejemplo trivial de un programa multiproceso es

from multiprocessing import Process

def f(name):
    print('hello', name)

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

Para mostrar los ID de proceso individuales involucrados, aquí hay un ejemplo ampliado:

from multiprocessing import Process
import os

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())

def f(name):
    info('function f')
    print('hello', name)

if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

Para obtener una explicación de por qué if __name__ == '__main__' parte es necesaria, ver Pautas de programación.

Contextos y métodos de inicio

Dependiendo de la plataforma, multiprocessing admite tres formas de iniciar un proceso. Estas métodos de inicio están

Aparecer

El proceso padre inicia un nuevo proceso de intérprete de Python. El proceso hijo solo heredará los recursos necesarios para ejecutar el proceso del objeto run() método. En particular, no se heredarán los descriptores de archivo y los identificadores innecesarios del proceso principal. Iniciar un proceso con este método es bastante lento en comparación con usar tenedor o tenedor.

Disponible en Unix y Windows. El predeterminado en Windows y macOS.

tenedor

El proceso padre usa os.fork() para bifurcar el intérprete de Python. El proceso hijo, cuando comienza, es efectivamente idéntico al proceso padre. Todos los recursos del padre son heredados por el proceso hijo. Tenga en cuenta que la bifurcación segura de un proceso multiproceso es problemática.

Disponible solo en Unix. El predeterminado en Unix.

tenedor

Cuando el programa se inicia y selecciona el tenedor método de inicio, se inicia un proceso de servidor. A partir de ese momento, cada vez que se necesita un nuevo proceso, el proceso padre se conecta al servidor y solicita que bifurque un nuevo proceso. El proceso del servidor de bifurcación es de un solo subproceso, por lo que es seguro de usar os.fork(). No se heredan recursos innecesarios.

Disponible en plataformas Unix que admiten el paso de descriptores de archivos a través de tuberías Unix.

Modificado en la versión 3.8: En macOS, el Aparecer El método de inicio ahora es el predeterminado. los tenedor El método de inicio debe considerarse inseguro, ya que puede provocar fallas en el subproceso. Ver bpo-33725.

Modificado en la versión 3.4: Aparecer agregado en todas las plataformas Unix, y tenedor agregado para algunas plataformas Unix. Los procesos secundarios ya no heredan todos los identificadores heredables principales en Windows.

En Unix usando el Aparecer o tenedor Los métodos de inicio también iniciarán un rastreador de recursos proceso que rastrea los recursos del sistema con nombre no vinculados (como semáforos con nombre o SharedMemory objetos) creados por procesos del programa. Cuando todos los procesos han salido, el rastreador de recursos desvincula cualquier objeto rastreado restante. Por lo general, no debería haber ninguno, pero si una señal mata un proceso, puede haber algunos recursos “filtrados”. (Ni los semáforos filtrados ni los segmentos de memoria compartida se desvincularán automáticamente hasta el próximo reinicio. Esto es problemático para ambos objetos porque el sistema permite solo un número limitado de semáforos con nombre y los segmentos de memoria compartida ocupan algo de espacio en la memoria principal).

Para seleccionar un método de inicio, utilice el set_start_method() en el if __name__ == '__main__' cláusula del módulo principal. Por ejemplo:

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    mp.set_start_method('spawn')
    q = mp.Queue()
    p = mp.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

set_start_method() no debe usarse más de una vez en el programa.

Alternativamente, puede usar get_context() para obtener un objeto de contexto. Los objetos de contexto tienen la misma API que el módulo de multiprocesamiento y permiten utilizar varios métodos de inicio en el mismo programa.

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

Tenga en cuenta que los objetos relacionados con un contexto pueden no ser compatibles con los procesos de un contexto diferente. En particular, las cerraduras creadas con el tenedor El contexto no se puede pasar a los procesos iniciados utilizando el Aparecer o tenedor métodos de inicio.

Una biblioteca que quiera usar un método de inicio particular probablemente debería usar get_context() para evitar interferir con la elección del usuario de la biblioteca.

Advertencia

los 'spawn' y 'forkserver' Los métodos de inicio no se pueden utilizar actualmente con ejecutables “congelados” (es decir, binarios producidos por paquetes como PyInstaller y cx_Freeze) en Unix. los 'fork' El método de inicio funciona.

Intercambio de objetos entre procesos

multiprocessing admite dos tipos de canales de comunicación entre procesos:

Colas

los Queue la clase es casi un clon de queue.Queue. Por ejemplo:

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()

Las colas son seguras para procesos y subprocesos.

Tubería

los Pipe() La función devuelve un par de objetos de conexión conectados por una tubería que por defecto es dúplex (bidireccional). Por ejemplo:

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()

Los dos objetos de conexión devueltos por Pipe() representan los dos extremos de la tubería. Cada objeto de conexión tiene send() y recv() métodos (entre otros). Tenga en cuenta que los datos de una tubería pueden dañarse si dos procesos (o subprocesos) intentan leer o escribir en el mismo extremo de la tubería al mismo tiempo. Por supuesto, no hay riesgo de corrupción de procesos que utilizan diferentes extremos de la tubería al mismo tiempo.

Sincronización entre procesos

multiprocessing contiene equivalentes de todas las primitivas de sincronización de threading. Por ejemplo, se puede usar un candado para asegurarse de que solo un proceso se imprima en la salida estándar a la vez:

from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

Sin utilizar la salida de bloqueo de los diferentes procesos, es probable que se confundan todos.

Compartiendo estado entre procesos

Como se mencionó anteriormente, cuando se realiza una programación simultánea, generalmente es mejor evitar el uso del estado compartido en la medida de lo posible. Esto es particularmente cierto cuando se utilizan varios procesos.

Sin embargo, si realmente necesita utilizar algunos datos compartidos, multiprocessing proporciona un par de formas de hacerlo.

Memoria compartida

Los datos se pueden almacenar en un mapa de memoria compartida usando Value o Array. Por ejemplo, el siguiente código

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

imprimirá

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

los 'd' y 'i' argumentos utilizados al crear num y arr son códigos de tipo del tipo utilizado por el array módulo: 'd' indica un flotador de doble precisión y 'i' indica un entero con signo. Estos objetos compartidos serán seguros para procesos y subprocesos.

Para mayor flexibilidad en el uso de la memoria compartida, se puede utilizar el multiprocessing.sharedctypes módulo que admite la creación de objetos ctypes arbitrarios asignados desde la memoria compartida.

Proceso del servidor

Un objeto de administrador devuelto por Manager() controla un proceso de servidor que contiene objetos Python y permite que otros procesos los manipulen usando proxies.

Un gerente regresó por Manager() apoyará tipos list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value y Array. Por ejemplo,

from multiprocessing import Process, Manager

def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))

        p = Process(target=f, args=(d, l))
        p.start()
        p.join()

        print(d)
        print(l)

imprimirá

{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

Los administradores de procesos de servidor son más flexibles que el uso de objetos de memoria compartida porque se pueden hacer para admitir tipos de objetos arbitrarios. Además, los procesos de diferentes computadoras en una red pueden compartir un solo administrador. Sin embargo, son más lentos que el uso de memoria compartida.

Usar un grupo de trabajadores

los Pool la clase representa un grupo de procesos de trabajo. Tiene métodos que permiten que las tareas se descarguen a los procesos de trabajo de diferentes formas.

Por ejemplo:

from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
    return x*x

if __name__ == '__main__':
    # start 4 worker processes
    with Pool(processes=4) as pool:

        # print "[0, 1, 4,..., 81]"
        print(pool.map(f, range(10)))

        # print same numbers in arbitrary order
        for i in pool.imap_unordered(f, range(10)):
            print(i)

        # evaluate "f(20)" asynchronously
        res = pool.apply_async(f, (20,))      # runs in *only* one process
        print(res.get(timeout=1))             # prints "400"

        # evaluate "os.getpid()" asynchronously
        res = pool.apply_async(os.getpid, ()) # runs in *only* one process
        print(res.get(timeout=1))             # prints the PID of that process

        # launching multiple evaluations asynchronously *may* use more processes
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        print([res.get(timeout=1) for res in multiple_results])

        # make a single worker sleep for 10 secs
        res = pool.apply_async(time.sleep, (10,))
        try:
            print(res.get(timeout=1))
        except TimeoutError:
            print("We lacked patience and got a multiprocessing.TimeoutError")

        print("For the moment, the pool remains available for more work")

    # exiting the 'with'-block has stopped the pool
    print("Now the pool is closed and no longer available")

Tenga en cuenta que los métodos de un grupo solo deben ser utilizados por el proceso que lo creó.

Nota

La funcionalidad dentro de este paquete requiere que el __main__ que los niños puedan importar el módulo. Esto está cubierto en Pautas de programación sin embargo, vale la pena señalarlo aquí. Esto significa que algunos ejemplos, como el multiprocessing.pool.Pool los ejemplos no funcionarán en el intérprete interactivo. Por ejemplo:

>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
...     return x*x
...
>>> with p:
...   p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'

(Si intenta esto, en realidad generará tres trazas completas intercaladas de manera semi-aleatoria, y luego es posible que deba detener el proceso principal de alguna manera).

Referencia

los multiprocessing paquete en su mayoría replica la API del threading módulo.

Process y excepciones

class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

Los objetos de proceso representan la actividad que se ejecuta en un proceso separado. los Process clase tiene equivalentes de todos los métodos de threading.Thread.

El constructor siempre debe llamarse con argumentos de palabras clave. grupo siempre debe ser None; existe únicamente por compatibilidad con threading.Thread. objetivo es el objeto invocable que será invocado por el run() método. Por defecto es None, lo que significa que no se llama nada. nombre es el nombre del proceso (ver name para más detalles). argumentos es la tupla de argumentos para la invocación de destino. kwargs es un diccionario de argumentos de palabras clave para la invocación de destino. Si se proporciona, la palabra clave demonio argumento establece el proceso daemon bandera a True o False. Si None (el valor predeterminado), esta bandera se heredará del proceso de creación.

De forma predeterminada, no se pasan argumentos a objetivo.

Si una subclase anula el constructor, debe asegurarse de que invoca al constructor de la clase base (Process.__init__()) antes de hacer cualquier otra cosa en el proceso.

Modificado en la versión 3.3: Agregó el demonio argumento.

run()

Método que representa la actividad del proceso.

Puede anular este método en una subclase. El estandar run() El método invoca el objeto invocable pasado al constructor del objeto como argumento de destino, si lo hay, con argumentos secuenciales y de palabra clave tomados del argumentos y kwargs argumentos, respectivamente.

start()

Inicie la actividad del proceso.

Esto se debe llamar como máximo una vez por objeto de proceso. Organiza el objeto run() método para ser invocado en un proceso separado.

join([timeout])

Si el argumento opcional se acabó el tiempo es None (el predeterminado), el método se bloquea hasta que el proceso cuyo join() se llama al método termina. Si se acabó el tiempo es un número positivo, bloquea como máximo se acabó el tiempo segundos. Tenga en cuenta que el método devuelve None si su proceso termina o si el método se agota. Verifique el proceso exitcode para determinar si terminó.

Un proceso se puede unir muchas veces.

Un proceso no puede unirse a sí mismo porque esto provocaría un interbloqueo. Es un error intentar unirse a un proceso antes de que se haya iniciado.

name

El nombre del proceso. El nombre es una cadena que se utiliza únicamente con fines de identificación. No tiene semántica. A varios procesos se les puede dar el mismo nombre.

El nombre inicial lo establece el constructor. Si no se proporciona un nombre explícito al constructor, un nombre de la forma ‘Proceso-N1:NORTE2:…:NORTEk‘se construye, donde cada Nk es el enésimo hijo de su padre.

is_alive()

Devuelve si el proceso está vivo.

Aproximadamente, un objeto de proceso está vivo desde el momento en que start() El método regresa hasta que termina el proceso hijo.

daemon

La bandera del demonio del proceso, un valor booleano. Esto debe establecerse antes start() se llama.

El valor inicial se hereda del proceso de creación.

Cuando un proceso sale, intenta terminar todos sus procesos secundarios demoníacos.

Tenga en cuenta que un proceso demoníaco no puede crear procesos secundarios. De lo contrario, un proceso demoníaco dejaría huérfanos a sus hijos si se termina cuando sale su proceso padre. Además, estos son no Demonios o servicios Unix, son procesos normales que se terminarán (y no se unirán) si los procesos no demoníacos han salido.

Además de threading.Thread API, Process Los objetos también admiten los siguientes atributos y métodos:

pid

Devuelve el ID del proceso. Antes de que se genere el proceso, esto será None.

exitcode

El código de salida del niño. Esto será None si el proceso aún no ha terminado. Un valor negativo -NORTE indica que el niño fue despedido por señal norte.

authkey

La clave de autenticación del proceso (una cadena de bytes).

Cuando multiprocessing se inicializa al proceso principal se le asigna una cadena aleatoria usando os.urandom().

Cuando una Process se crea el objeto, heredará la clave de autenticación de su proceso padre, aunque esto se puede cambiar estableciendo authkey a otra cadena de bytes.

Ver Claves de autenticación.

sentinel

Un identificador numérico de un objeto del sistema que estará “listo” cuando finalice el proceso.

Puede usar este valor si desea esperar en varios eventos a la vez usando multiprocessing.connection.wait(). De lo contrario llamando join() es más simple.

En Windows, este es un identificador de sistema operativo que se puede usar con el WaitForSingleObject y WaitForMultipleObjects familia de llamadas API. En Unix, este es un descriptor de archivo que se puede utilizar con primitivas del select módulo.

Nuevo en la versión 3.3.

terminate()

Termina el proceso. En Unix esto se hace usando el SIGTERM señal; en Windows TerminateProcess() se utiliza. Tenga en cuenta que los controladores de salida y las cláusulas finalmente, etc., no se ejecutarán.

Tenga en cuenta que los procesos descendientes del proceso no terminar, simplemente quedarán huérfanos.

Advertencia

Si se usa este método cuando el proceso asociado está usando una tubería o una cola, entonces la tubería o la cola pueden corromperse y pueden volverse inutilizables por otro proceso. De manera similar, si el proceso ha adquirido un bloqueo o semáforo, etc., su terminación puede provocar que otros procesos se bloqueen.

kill()

Igual que terminate() pero usando el SIGKILL señal en Unix.

Nuevo en la versión 3.7.

close()

Cierra el Process objeto, liberando todos los recursos asociados con él. ValueError se genera si el proceso subyacente aún se está ejecutando. Una vez close() devuelve con éxito, la mayoría de los otros métodos y atributos del Process el objeto se levantará ValueError.

Nuevo en la versión 3.7.

Tenga en cuenta que el start(), join(), is_alive(), terminate() y exitcode Los métodos solo deben ser llamados por el proceso que creó el objeto de proceso.

Ejemplo de uso de algunos de los métodos de Process:

 >>> import multiprocessing, time, signal
 >>> p = multiprocessing.Process(target=time.sleep, args=(1000,))
 >>> print(p, p.is_alive())
 <Process ... initial> False
 >>> p.start()
 >>> print(p, p.is_alive())
 <Process ... started> True
 >>> p.terminate()
 >>> time.sleep(0.1)
 >>> print(p, p.is_alive())
 <Process ... stopped exitcode=-SIGTERM> False
 >>> p.exitcode == -signal.SIGTERM
 True
exception multiprocessing.ProcessError

La clase base de todos multiprocessing excepciones.

exception multiprocessing.BufferTooShort

Excepción planteada por Connection.recv_bytes_into() cuando el objeto de búfer proporcionado es demasiado pequeño para la lectura del mensaje.

Si e es una instancia de BufferTooShort luego e.args[0] dará el mensaje como una cadena de bytes.

exception multiprocessing.AuthenticationError

Se genera cuando hay un error de autenticación.

exception multiprocessing.TimeoutError

Generado por métodos con un tiempo de espera cuando expira el tiempo de espera.

Tuberías y colas

Cuando se usan múltiples procesos, generalmente se usa el paso de mensajes para la comunicación entre procesos y se evita tener que usar primitivas de sincronización como bloqueos.

Para pasar mensajes, uno puede usar Pipe() (para una conexión entre dos procesos) o una cola (que permite múltiples productores y consumidores).

los Queue, SimpleQueue y JoinableQueue los tipos son de múltiples productores, de múltiples consumidores FIFO colas modeladas en el queue.Queue clase en la biblioteca estándar. Se diferencian en eso Queue carece del task_done() y join() métodos introducidos en Python 2.5’s queue.Queue clase.

Si utiliza JoinableQueue entonces tú debe llama JoinableQueue.task_done() para cada tarea eliminada de la cola o, de lo contrario, el semáforo utilizado para contar el número de tareas sin terminar puede eventualmente desbordarse, generando una excepción.

Tenga en cuenta que también se puede crear una cola compartida utilizando un objeto de administrador; consulte Gerentes.

Nota

multiprocessing usa el habitual queue.Empty y queue.Full excepciones para señalar un tiempo de espera. No están disponibles en el multiprocessing espacio de nombres, por lo que debe importarlos desde queue.

Nota

Cuando un objeto se coloca en una cola, el objeto se decapa y un subproceso de fondo luego descarga los datos decapados a una tubería subyacente. Esto tiene algunas consecuencias que son un poco sorprendentes, pero no deberían causar ninguna dificultad práctica; si realmente te molestan, puedes usar una cola creada con un gerente.

  1. Después de poner un objeto en una cola vacía, puede haber un retraso infinitesimal antes de que la cola empty() devuelve el método False y get_nowait() puede volver sin levantar queue.Empty.
  2. Si varios procesos están colocando objetos en cola, es posible que los objetos se reciban en el otro extremo fuera de servicio. Sin embargo, los objetos puestos en cola por el mismo proceso siempre estarán en el orden esperado entre sí.

Advertencia

Si un proceso se mata usando Process.terminate() o os.kill() mientras intenta utilizar un Queue, es probable que los datos de la cola se corrompan. Esto puede hacer que cualquier otro proceso obtenga una excepción cuando intente usar la cola más adelante.

Advertencia

Como se mencionó anteriormente, si un proceso hijo ha puesto elementos en una cola (y no ha utilizado JoinableQueue.cancel_join_thread), ese proceso no terminará hasta que todos los elementos almacenados en búfer se hayan descargado en la tubería.

Esto significa que si intenta unirse a ese proceso, puede llegar a un punto muerto a menos que esté seguro de que se han consumido todos los elementos que se colocaron en la cola. De manera similar, si el proceso hijo no es demoníaco, entonces el proceso padre puede colgarse al salir cuando intente unirse a todos sus hijos no demoníacos.

Tenga en cuenta que una cola creada con un administrador no tiene este problema. Ver Pautas de programación.

Para ver un ejemplo del uso de colas para la comunicación entre procesos, consulte Ejemplos de.

multiprocessing.Pipe([duplex])

Devuelve un par (conn1, conn2) de Connection objetos que representan los extremos de una tubería.

Si dúplex es True (el valor predeterminado) entonces la tubería es bidireccional. Si dúplex es False entonces la tubería es unidireccional: conn1 solo se puede utilizar para recibir mensajes y conn2 solo se puede utilizar para enviar mensajes.

class multiprocessing.Queue([maxsize])

Devuelve una cola de proceso compartida implementada mediante una tubería y algunos bloqueos / semáforos. Cuando un proceso coloca por primera vez un elemento en la cola, se inicia un subproceso alimentador que transfiere objetos desde un búfer a la tubería.

Lo normal queue.Empty y queue.Full excepciones de la biblioteca estándar queue módulo se elevan para señalar tiempos de espera.

Queue implementa todos los métodos de queue.Queue excepto por task_done() y join().

qsize()

Devuelve el tamaño aproximado de la cola. Debido a la semántica de multiproceso / multiprocesamiento, este número no es confiable.

Tenga en cuenta que esto puede aumentar NotImplementedError en plataformas Unix como Mac OS X donde sem_getvalue() no está implementado.

empty()

Regreso True si la cola está vacía, False de lo contrario. Debido a la semántica de multiproceso / multiprocesamiento, esto no es confiable.

full()

Regreso True si la cola está llena, False de lo contrario. Debido a la semántica de multiproceso / multiprocesamiento, esto no es confiable.

put(obj[, block[, timeout]])

Pon obj en la cola. Si el argumento opcional cuadra es True (el predeterminado) y se acabó el tiempo es None (predeterminado), bloquee si es necesario hasta que haya un espacio libre disponible. Si se acabó el tiempo es un número positivo, bloquea como máximo se acabó el tiempo segundos y levanta el queue.Full Excepción si no hay espacio libre disponible dentro de ese tiempo. De lo contrario (cuadra es False), ponga un elemento en la cola si hay un espacio libre disponible de inmediato; de lo contrario, aumente el queue.Full excepciónse acabó el tiempo se ignora en ese caso).

Modificado en la versión 3.8: Si la cola está cerrada, ValueError se levanta en lugar de AssertionError.

put_nowait(obj)

Equivalente a put(obj, False).

get([block[, timeout]])

Eliminar y devolver un artículo de la cola. Si argumentos opcionales cuadra es True (el predeterminado) y se acabó el tiempo es None (el valor predeterminado), bloquee si es necesario hasta que un elemento esté disponible. Si se acabó el tiempo es un número positivo, bloquea como máximo se acabó el tiempo segundos y levanta el queue.Empty excepción si no había ningún artículo disponible dentro de ese tiempo. De lo contrario (el bloque es False), devuelva un artículo si hay uno disponible de inmediato; de lo contrario, aumente el queue.Empty excepciónse acabó el tiempo se ignora en ese caso).

Modificado en la versión 3.8: Si la cola está cerrada, ValueError se levanta en lugar de OSError.

get_nowait()

Equivalente a get(False).

multiprocessing.Queue tiene algunos métodos adicionales que no se encuentran en queue.Queue. Estos métodos suelen ser innecesarios para la mayoría de los códigos:

close()

Indique que el proceso actual no pondrá más datos en esta cola. El subproceso en segundo plano se cerrará una vez que haya vaciado todos los datos almacenados en búfer en la tubería. Este es se llama automáticamente cuando la cola se recolecta como basura.

join_thread()

Únase al hilo de fondo. Esto solo se puede usar después close() ha sido llamado. Se bloquea hasta que sale el subproceso de fondo, lo que garantiza que todos los datos del búfer se hayan descargado en la tubería.

De forma predeterminada, si un proceso no es el creador de la cola, al salir intentará unirse al hilo de fondo de la cola. El proceso puede llamar cancel_join_thread() para hacer join_thread() hacer nada.

cancel_join_thread()

Evitar join_thread() del bloqueo. En particular, esto evita que el subproceso en segundo plano se una automáticamente cuando se cierra el proceso; consulte join_thread().

Un mejor nombre para este método podría ser allow_exit_without_flush(). Es probable que provoque la pérdida de datos en cola y es casi seguro que no necesitará utilizarlos. En realidad, solo está allí si necesita que el proceso actual salga de inmediato sin esperar a descargar los datos en cola a la tubería subyacente, y no le importa la pérdida de datos.

Nota

La funcionalidad de esta clase requiere una implementación de semáforo compartida que funcione en el sistema operativo host. Sin uno, la funcionalidad de esta clase se desactivará e intentará crear una instancia Queue resultará en un ImportError. Ver bpo-3770 para informacion adicional. Lo mismo es válido para cualquiera de los tipos de cola especializados que se enumeran a continuación.

class multiprocessing.SimpleQueue

Es un simplificado Queue tipo, muy cerca de un bloqueado Pipe.

close()

Cerrar la cola: liberar recursos internos.

Una cola no se debe utilizar más después de que se cierre. Por ejemplo, get(), put() y empty() ya no se deben llamar a los métodos.

Nuevo en la versión 3.9.

empty()

Regreso True si la cola está vacía, False de lo contrario.

get()

Eliminar y devolver un artículo de la cola.

put(item)

Poner artículo en la cola.

class multiprocessing.JoinableQueue([maxsize])

JoinableQueue, a Queue subclase, es una cola que además tiene task_done() y join() métodos.

task_done()

Indica que una tarea en cola anteriormente está completa. Usado por consumidores de cola. Para cada get() utilizado para buscar una tarea, una llamada posterior a task_done() le dice a la cola que el procesamiento de la tarea está completo.

Si un join() está bloqueando actualmente, se reanudará cuando todos los elementos hayan sido procesados ​​(lo que significa que task_done() se recibió una llamada por cada artículo que se había put() en la cola).

Plantea un ValueError si se llama más veces que los elementos colocados en la cola.

join()

Bloquear hasta que se hayan obtenido y procesado todos los elementos de la cola.

El recuento de tareas sin terminar aumenta cada vez que se agrega un elemento a la cola. El recuento desciende cada vez que un consumidor llama task_done() para indicar que se recuperó el elemento y que todo el trabajo en él está completo. Cuando el recuento de tareas sin terminar cae a cero, join() desbloquea.

Diverso

multiprocessing.active_children()

Devuelve la lista de todos los hijos vivos del proceso actual.

Llamar a esto tiene el efecto secundario de “unirse” a cualquier proceso que ya haya finalizado.

multiprocessing.cpu_count()

Devuelve el número de CPU del sistema.

Este número no es equivalente al número de CPU que puede utilizar el proceso actual. El número de CPU utilizables se puede obtener con len(os.sched_getaffinity(0))

Puede levantar NotImplementedError.

Ver también

os.cpu_count()

multiprocessing.current_process()

Devuelve el Process objeto correspondiente al proceso actual.

Un análogo de threading.current_thread().

multiprocessing.parent_process()

Devuelve el Process objeto correspondiente al proceso padre del current_process(). Para el proceso principal, parent_process estarán None.

Nuevo en la versión 3.8.

multiprocessing.freeze_support()

Agregue soporte para cuando un programa que usa multiprocessing se ha congelado para producir un ejecutable de Windows. (Ha sido probado con py2exe, PyInstaller y cx_Freeze.)

Es necesario llamar a esta función inmediatamente después de la if __name__ ==
'__main__'
línea del módulo principal. Por ejemplo:

from multiprocessing import Process, freeze_support

def f():
    print('hello world!')

if __name__ == '__main__':
    freeze_support()
    Process(target=f).start()

Si el freeze_support() se omite la línea, luego, al intentar ejecutar el ejecutable congelado, se generará RuntimeError.

Vocación freeze_support() no tiene ningún efecto cuando se invoca en cualquier sistema operativo que no sea Windows. Además, si el intérprete de Python ejecuta normalmente el módulo en Windows (el programa no se ha congelado), entonces freeze_support() no tiene efecto.

multiprocessing.get_all_start_methods()

Devuelve una lista de los métodos de inicio admitidos, el primero de los cuales es el predeterminado. Los posibles métodos de inicio son 'fork', 'spawn' y 'forkserver'. Solo en Windows 'spawn' está disponible. En Unix 'fork' y 'spawn' siempre son compatibles, con 'fork' siendo el predeterminado.

Nuevo en la versión 3.4.

multiprocessing.get_context(method=None)

Devuelve un objeto de contexto que tiene los mismos atributos que el multiprocessing módulo.

Si método es None luego se devuelve el contexto predeterminado. De lo contrario método debiera ser 'fork', 'spawn', 'forkserver'. ValueError se genera si el método de inicio especificado no está disponible.

Nuevo en la versión 3.4.

multiprocessing.get_start_method(allow_none=False)

Devuelve el nombre del método de inicio utilizado para iniciar los procesos.

Si el método de inicio no se ha corregido y allow_none es falso, entonces el método de inicio se fija al predeterminado y se devuelve el nombre. Si el método de inicio no se ha corregido y allow_none es verdad entonces None es regresado.

El valor de retorno puede ser 'fork', 'spawn', 'forkserver' o None. 'fork' es el predeterminado en Unix, mientras que 'spawn' es el predeterminado en Windows.

Nuevo en la versión 3.4.

multiprocessing.set_executable()

Establece la ruta del intérprete de Python que se utilizará al iniciar un proceso hijo. (Por defecto sys.executable se utiliza). Los incrustadores probablemente necesitarán hacer algo como

set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))

antes de que puedan crear procesos secundarios.

Modificado en la versión 3.4: Ahora compatible con Unix cuando el 'spawn' se utiliza el método de inicio.

multiprocessing.set_start_method(method)

Establezca el método que se debe utilizar para iniciar procesos secundarios. método puede ser 'fork', 'spawn' o 'forkserver'.

Tenga en cuenta que esto debe llamarse como máximo una vez y debe protegerse dentro del if __name__ == '__main__' cláusula del módulo principal.

Nuevo en la versión 3.4.

Nota

multiprocessing no contiene análogos de threading.active_count(), threading.enumerate(), threading.settrace(), threading.setprofile(), threading.Timer, o threading.local.

Objetos de conexión

Los objetos de conexión permiten el envío y la recepción de cadenas o objetos decapables. Se pueden considerar como enchufes conectados orientados a mensajes.

Los objetos de conexión se crean generalmente usando Pipe – ver también Oyentes y Clientes.

class multiprocessing.connection.Connection
send(obj)

Envíe un objeto al otro extremo de la conexión que debe leerse usando recv().

El objeto debe ser decapado. Los encurtidos muy grandes (aproximadamente 32 MiB +, aunque depende del sistema operativo) pueden generar un ValueError excepción.

recv()

Devuelve un objeto enviado desde el otro extremo de la conexión usando send(). Bloquea hasta que haya algo que recibir. Eleva EOFError si no queda nada para recibir y el otro extremo estaba cerrado.

fileno()

Devuelve el descriptor de archivo o el identificador utilizado por la conexión.

close()

Cierra la conexión.

Esto se llama automáticamente cuando la conexión se recolecta como basura.

poll([timeout])

Devuelve si hay datos disponibles para leer.

Si se acabó el tiempo no se especifica, volverá inmediatamente. Si se acabó el tiempo es un número, entonces esto especifica el tiempo máximo en segundos para bloquear. Si se acabó el tiempo es None entonces se usa un tiempo de espera infinito.

Tenga en cuenta que se pueden sondear varios objetos de conexión a la vez mediante el uso de multiprocessing.connection.wait().

send_bytes(buffer[, offset[, size]])

Enviar datos de bytes desde un objeto similar a bytes como un mensaje completo.

Si compensar se da, entonces los datos se leen desde esa posición en buffer. Si Talla Se da entonces que se leerán muchos bytes del búfer. Los búferes muy grandes (aproximadamente 32 MiB +, aunque depende del sistema operativo) pueden generar un ValueError excepción

recv_bytes([maxlength])

Devuelve un mensaje completo de datos de bytes enviados desde el otro extremo de la conexión como una cadena. Bloquea hasta que haya algo que recibir. Eleva EOFError si no queda nada para recibir y el otro extremo se ha cerrado.

Si longitud máxima se especifica y el mensaje es más largo que longitud máxima luego OSError se activa y la conexión ya no será legible.

Modificado en la versión 3.3: Esta función solía aumentar IOError, que ahora es un alias de OSError.

recv_bytes_into(buffer[, offset])

Interpretar buffer un mensaje completo de datos de bytes enviados desde el otro extremo de la conexión y devuelve el número de bytes en el mensaje. Bloquea hasta que haya algo que recibir. Eleva EOFError si no queda nada para recibir y el otro extremo estaba cerrado.

buffer debe ser un escribible objeto similar a bytes. Si compensar se da, entonces el mensaje se escribirá en el búfer desde esa posición. El desplazamiento debe ser un número entero no negativo menor que la longitud de buffer (en bytes).

Si el búfer es demasiado corto, BufferTooShort se genera una excepción y el mensaje completo está disponible como e.args[0] dónde e es la instancia de excepción.

Modificado en la versión 3.3: Los propios objetos de conexión ahora se pueden transferir entre procesos utilizando Connection.send() y Connection.recv().

Nuevo en la versión 3.3: Los objetos de conexión ahora admiten el protocolo de gestión de contexto; consulte Tipos de administrador de contexto. __enter__() devuelve el objeto de conexión y __exit__() llamadas close().

Por ejemplo:

>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])

Advertencia

los Connection.recv() El método elimina automáticamente los datos que recibe, lo que puede suponer un riesgo para la seguridad a menos que pueda confiar en el proceso que envió el mensaje.

Por lo tanto, a menos que el objeto de conexión se haya producido utilizando Pipe() solo debes usar el recv() y send() métodos después de realizar algún tipo de autenticación. Ver Claves de autenticación.

Advertencia

Si un proceso se mata mientras intenta leer o escribir en una tubería, es probable que los datos de la tubería se corrompan, porque puede resultar imposible estar seguro de dónde se encuentran los límites del mensaje.

Primitivas de sincronización

Generalmente, las primitivas de sincronización no son tan necesarias en un programa multiproceso como lo son en un programa multiproceso. Consulte la documentación para threading módulo.

Tenga en cuenta que también se pueden crear primitivas de sincronización utilizando un objeto administrador; consulte Gerentes.

class multiprocessing.Barrier(parties[, action[, timeout]])

Un objeto barrera: un clon de threading.Barrier.

Nuevo en la versión 3.3.

class multiprocessing.BoundedSemaphore([value])

Un objeto semáforo acotado: un análogo cercano de threading.BoundedSemaphore.

Existe una diferencia solitaria de su análogo cercano: su acquire el primer argumento del método se llama cuadra, como es consistente con Lock.acquire().

Nota

En Mac OS X, esto es indistinguible de Semaphore porque sem_getvalue() no está implementado en esa plataforma.

class multiprocessing.Condition([lock])

Una variable de condición: un alias para threading.Condition.

Si cerrar con llave se especifica, entonces debería ser un Lock o RLock objeto de multiprocessing.

Modificado en la versión 3.3: los wait_for() se agregó el método.

class multiprocessing.Event

Un clon de threading.Event.

class multiprocessing.Lock

Un objeto de bloqueo no recursivo: un análogo cercano de threading.Lock. Una vez que un proceso o subproceso ha adquirido un bloqueo, los intentos posteriores de adquirirlo de cualquier proceso o subproceso se bloquearán hasta que se libere; cualquier proceso o hilo puede liberarlo. Los conceptos y comportamientos de threading.Lock como se aplica a los subprocesos se replican aquí en multiprocessing.Lock ya que se aplica a procesos o subprocesos, excepto cuando se indique.

Tenga en cuenta que Lock es en realidad una función de fábrica que devuelve una instancia de multiprocessing.synchronize.Lock inicializado con un contexto predeterminado.

Lock apoya el administrador de contexto protocolo y, por lo tanto, se puede utilizar en with declaraciones.

acquire(block=True, timeout=None)

Adquirir un candado, bloqueante o no bloqueante.

Con el cuadra argumento establecido en True (el valor predeterminado), la llamada al método se bloqueará hasta que el bloqueo esté en un estado desbloqueado, luego configúrelo como bloqueado y regrese True. Tenga en cuenta que el nombre de este primer argumento difiere del de threading.Lock.acquire().

Con el cuadra argumento establecido en False, la llamada al método no se bloquea. Si la cerradura se encuentra actualmente en un estado bloqueado, regrese False; de lo contrario, establezca el bloqueo en un estado bloqueado y vuelva True.

Cuando se invoca con un valor de punto flotante positivo para se acabó el tiempo, bloquea como máximo el número de segundos especificado por se acabó el tiempo siempre que no se pueda adquirir el candado. Invocaciones con un valor negativo para se acabó el tiempo son equivalentes a un se acabó el tiempo de cero. Invocaciones con un se acabó el tiempo valor de None (el valor predeterminado) establece el período de tiempo de espera en infinito. Tenga en cuenta que el tratamiento de negativos o None valores para se acabó el tiempo difiere del comportamiento implementado en threading.Lock.acquire(). los se acabó el tiempo argumento no tiene implicaciones prácticas si el cuadra el argumento se establece en False y por lo tanto se ignora. Devoluciones True si la cerradura ha sido adquirida o False si ha transcurrido el tiempo de espera.

release()

Suelta un candado. Esto se puede llamar desde cualquier proceso o subproceso, no solo el proceso o subproceso que originalmente adquirió el bloqueo.

El comportamiento es el mismo que en threading.Lock.release() excepto que cuando se invoca en un candado desbloqueado, un ValueError es elevado.

class multiprocessing.RLock

Un objeto de bloqueo recursivo: un análogo cercano de threading.RLock. Un bloqueo recursivo debe ser liberado por el proceso o subproceso que lo adquirió. Una vez que un proceso o subproceso ha adquirido un bloqueo recursivo, el mismo proceso o subproceso puede adquirirlo nuevamente sin bloquearse; ese proceso o subproceso debe liberarlo una vez por cada vez que se haya adquirido.

Tenga en cuenta que RLock es en realidad una función de fábrica que devuelve una instancia de multiprocessing.synchronize.RLock inicializado con un contexto predeterminado.

RLock apoya el administrador de contexto protocolo y, por lo tanto, se puede utilizar en with declaraciones.

acquire(block=True, timeout=None)

Adquirir un candado, bloqueante o no bloqueante.

Cuando se invoca con el cuadra argumento establecido en True, bloquear hasta que el bloqueo esté en un estado desbloqueado (que no sea propiedad de ningún proceso o subproceso) a menos que el bloqueo ya sea propiedad del proceso o subproceso actual. El proceso o subproceso actual toma posesión del bloqueo (si aún no lo tiene) y el nivel de recursividad dentro del bloqueo se incrementa en uno, lo que da como resultado un valor de retorno de True. Tenga en cuenta que hay varias diferencias en el comportamiento de este primer argumento en comparación con la implementación de threading.RLock.acquire(), comenzando con el nombre del argumento en sí.

Cuando se invoca con el cuadra argumento establecido en False, no bloquees. Si el bloqueo ya ha sido adquirido (y por lo tanto es propiedad) de otro proceso o subproceso, el proceso o subproceso actual no toma posesión y el nivel de recursividad dentro del bloqueo no cambia, lo que da como resultado un valor de retorno de False. Si el bloqueo está en un estado desbloqueado, el proceso o subproceso actual toma posesión y el nivel de recursividad se incrementa, lo que da como resultado un valor de retorno de True.

Uso y comportamientos del se acabó el tiempo argumento son los mismos que en Lock.acquire(). Tenga en cuenta que algunos de estos comportamientos de se acabó el tiempo difieren de los comportamientos implementados en threading.RLock.acquire().

release()

Suelte un bloqueo, disminuyendo el nivel de recursividad. Si después del decremento el nivel de recursividad es cero, restablezca el bloqueo a desbloqueado (que no sea propiedad de ningún proceso o subproceso) y si cualquier otro proceso o subproceso está bloqueado esperando que el bloqueo se desbloquee, permita que continúe exactamente uno de ellos. Si después del decremento el nivel de recursividad sigue siendo distinto de cero, el bloqueo permanece bloqueado y es propiedad del proceso o subproceso que realiza la llamada.

Solo llame a este método cuando el proceso o subproceso que realiza la llamada sea propietario del bloqueo. Un AssertionError se genera si este método es llamado por un proceso o subproceso que no sea el propietario o si el bloqueo está en un estado desbloqueado (sin propietario). Tenga en cuenta que el tipo de excepción planteada en esta situación difiere del comportamiento implementado en threading.RLock.release().

class multiprocessing.Semaphore([value])

Un objeto semáforo: un análogo cercano de threading.Semaphore.

Existe una diferencia solitaria de su análogo cercano: su acquire el primer argumento del método se llama cuadra, como es consistente con Lock.acquire().

Nota

En Mac OS X, sem_timedwait no es compatible, así que llamando acquire() con un tiempo de espera emulará el comportamiento de esa función utilizando un ciclo de suspensión.

Nota

Si la señal SIGINT generada por Ctrl-C llega mientras el hilo principal está bloqueado por una llamada a BoundedSemaphore.acquire(), Lock.acquire(), RLock.acquire(), Semaphore.acquire(), Condition.acquire() o Condition.wait() entonces la llamada se interrumpirá inmediatamente y KeyboardInterrupt se levantará.

Esto difiere del comportamiento de threading donde SIGINT se ignorará mientras las llamadas de bloqueo equivalentes estén en curso.

Nota

Algunas de las funciones de este paquete requieren una implementación de semáforo compartida que funcione en el sistema operativo host. Sin uno, el multiprocessing.synchronize El módulo se deshabilitará y los intentos de importarlo darán como resultado un ImportError. Ver bpo-3770 para informacion adicional.

Compartido ctypes Objetos

Es posible crear objetos compartidos utilizando memoria compartida que puede ser heredada por procesos secundarios.

multiprocessing.Value(typecode_or_type, *args, lock=True)

Devolver un ctypes objeto asignado desde la memoria compartida. De forma predeterminada, el valor de retorno es en realidad un contenedor sincronizado para el objeto. Se puede acceder al objeto en sí a través del valor atributo de un Value.

typecode_or_type determina el tipo del objeto devuelto: es un tipo ctypes o un código de tipo de un carácter del tipo utilizado por el array módulo. * argumentos se pasa al constructor para el tipo.

Si cerrar con llave es True (valor predeterminado), se crea un nuevo objeto de bloqueo recursivo para sincronizar el acceso al valor. Si cerrar con llave es un Lock o RLock objeto, que se utilizará para sincronizar el acceso al valor. Si cerrar con llave es False entonces el acceso al objeto devuelto no estará protegido automáticamente por un candado, por lo que no será necesariamente “seguro para el proceso”.

Operaciones como += que implican una lectura y una escritura no son atómicas. Entonces, si, por ejemplo, desea incrementar atómicamente un valor compartido, no es suficiente con hacerlo

counter.value += 1

Suponiendo que el bloqueo asociado es recursivo (que es de forma predeterminada), puede hacerlo

with counter.get_lock():
    counter.value += 1

Tenga en cuenta que cerrar con llave es un argumento de solo palabras clave.

multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)

Devuelve una matriz ctypes asignada desde la memoria compartida. De forma predeterminada, el valor de retorno es en realidad un contenedor sincronizado para la matriz.

typecode_or_type determina el tipo de los elementos de la matriz devuelta: es un tipo ctypes o un código de tipo de un carácter del tipo utilizado por el array módulo. Si size_or_initializer es un número entero, luego determina la longitud de la matriz, y la matriz se pondrá inicialmente a cero. De lo contrario, size_or_initializer es una secuencia que se utiliza para inicializar la matriz y cuya longitud determina la longitud de la matriz.

Si cerrar con llave es True (predeterminado), se crea un nuevo objeto de bloqueo para sincronizar el acceso al valor. Si cerrar con llave es un Lock o RLock objeto, que se utilizará para sincronizar el acceso al valor. Si cerrar con llave es False entonces el acceso al objeto devuelto no estará protegido automáticamente por un candado, por lo que no será necesariamente “seguro para el proceso”.

Tenga en cuenta que cerrar con llave es un argumento de solo palabra clave.

Tenga en cuenta que una matriz de ctypes.c_char tiene valor y crudo atributos que permiten usarlo para almacenar y recuperar cadenas.

los multiprocessing.sharedctypes módulo

los multiprocessing.sharedctypes El módulo proporciona funciones para asignar ctypes objetos de la memoria compartida que pueden ser heredados por procesos secundarios.

Nota

Aunque es posible almacenar un puntero en la memoria compartida, recuerde que esto hará referencia a una ubicación en el espacio de direcciones de un proceso específico. Sin embargo, es muy probable que el puntero no sea válido en el contexto de un segundo proceso y tratar de eliminar la referencia del puntero del segundo proceso puede provocar un bloqueo.

multiprocessing.sharedctypes.RawArray(typecode_or_type, size_or_initializer)

Devuelve una matriz ctypes asignada desde la memoria compartida.

typecode_or_type determina el tipo de los elementos de la matriz devuelta: es un tipo ctypes o un código de tipo de un carácter del tipo utilizado por el array módulo. Si size_or_initializer es un número entero, entonces determina la longitud de la matriz, y la matriz se pondrá inicialmente a cero. De lo contrario size_or_initializer es una secuencia que se utiliza para inicializar la matriz y cuya longitud determina la longitud de la matriz.

Tenga en cuenta que configurar y obtener un elemento es potencialmente no atómico: use Array() en su lugar, para asegurarse de que el acceso se sincronice automáticamente mediante un candado.

multiprocessing.sharedctypes.RawValue(typecode_or_type, *args)

Devuelve un objeto ctypes asignado desde la memoria compartida.

typecode_or_type determina el tipo del objeto devuelto: es un tipo ctypes o un código de tipo de un carácter del tipo utilizado por el array módulo. * argumentos se pasa al constructor para el tipo.

Tenga en cuenta que configurar y obtener el valor es potencialmente no atómico: use Value() en su lugar, para asegurarse de que el acceso se sincronice automáticamente mediante un candado.

Tenga en cuenta que una matriz de ctypes.c_char tiene value y raw atributos que permiten usarlo para almacenar y recuperar cadenas; consulte la documentación para ctypes.

multiprocessing.sharedctypes.Array(typecode_or_type, size_or_initializer, *, lock=True)

Lo mismo que RawArray() excepto que dependiendo del valor de cerrar con llave se puede devolver un contenedor de sincronización seguro para el proceso en lugar de una matriz ctypes sin formato.

Si cerrar con llave es True (predeterminado), se crea un nuevo objeto de bloqueo para sincronizar el acceso al valor. Si cerrar con llave es un Lock o RLock objeto, que se utilizará para sincronizar el acceso al valor. Si cerrar con llave es False entonces el acceso al objeto devuelto no estará protegido automáticamente por un candado, por lo que no será necesariamente “seguro para el proceso”.

Tenga en cuenta que cerrar con llave es un argumento de solo palabras clave.

multiprocessing.sharedctypes.Value(typecode_or_type, *args, lock=True)

Lo mismo que RawValue() excepto que dependiendo del valor de cerrar con llave se puede devolver un contenedor de sincronización seguro para el proceso en lugar de un objeto ctypes sin formato.

Si cerrar con llave es True (predeterminado), se crea un nuevo objeto de bloqueo para sincronizar el acceso al valor. Si cerrar con llave es un Lock o RLock objeto, que se utilizará para sincronizar el acceso al valor. Si cerrar con llave es False entonces el acceso al objeto devuelto no estará protegido automáticamente por un candado, por lo que no será necesariamente “seguro para el proceso”.

Tenga en cuenta que cerrar con llave es un argumento de solo palabras clave.

multiprocessing.sharedctypes.copy(obj)

Devuelve un objeto ctypes asignado desde la memoria compartida que es una copia del objeto ctypes obj.

multiprocessing.sharedctypes.synchronized(obj[, lock])

Devuelve un objeto contenedor seguro para el proceso para un objeto ctypes que usa cerrar con llave para sincronizar el acceso. Si cerrar con llave es None (el predeterminado) luego un multiprocessing.RLock El objeto se crea automáticamente.

Un contenedor sincronizado tendrá dos métodos además de los del objeto que envuelve: get_obj() devuelve el objeto envuelto y get_lock() devuelve el objeto de bloqueo utilizado para la sincronización.

Tenga en cuenta que acceder al objeto ctypes a través del contenedor puede ser mucho más lento que acceder al objeto ctypes sin formato.

Modificado en la versión 3.5: Los objetos sincronizados admiten la administrador de contexto protocolo.

La siguiente tabla compara la sintaxis para crear objetos ctypes compartidos desde la memoria compartida con la sintaxis normal de ctypes. (En la mesa MyStruct es una subclase de ctypes.Structure.)

ctypes

sharedctypes usando el tipo

sharedctypes usando typecode

c_double (2,4)

RawValue (c_double, 2.4)

RawValue (‘d’, 2.4)

MyStruct (4, 6)

RawValue (MyStruct, 4, 6)

(c_corto * 7) ()

RawArray (c_short, 7)

RawArray (‘h’, 7)

(c_int * 3) (9, 2, 8)

RawArray (c_int, (9, 2, 8))

RawArray (‘i’, (9, 2, 8))

A continuación se muestra un ejemplo en el que un proceso hijo modifica varios objetos ctypes:

from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_double

class Point(Structure):
    _fields_ = [('x', c_double), ('y', c_double)]

def modify(n, x, s, A):
    n.value **= 2
    x.value **= 2
    s.value = s.value.upper()
    for a in A:
        a.x **= 2
        a.y **= 2

if __name__ == '__main__':
    lock = Lock()

    n = Value('i', 7)
    x = Value(c_double, 1.0/3.0, lock=False)
    s = Array('c', b'hello world', lock=lock)
    A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)

    p = Process(target=modify, args=(n, x, s, A))
    p.start()
    p.join()

    print(n.value)
    print(x.value)
    print(s.value)
    print([(a.x, a.y) for a in A])

Los resultados impresos son

49
0.1111111111111111
HELLO WORLD
[(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]

Gerentes

Los administradores proporcionan una forma de crear datos que se pueden compartir entre diferentes procesos, incluido el intercambio a través de una red entre procesos que se ejecutan en diferentes máquinas. Un objeto administrador controla un proceso de servidor que administra objetos compartidos. Otros procesos pueden acceder a los objetos compartidos mediante el uso de proxies.

multiprocessing.Manager()

Devuelve un iniciado SyncManager objeto que se puede utilizar para compartir objetos entre procesos. El objeto de administrador devuelto corresponde a un proceso hijo generado y tiene métodos que crearán objetos compartidos y devolverán los proxies correspondientes.

Los procesos de administrador se cerrarán tan pronto como se recojan la basura o se cierre el proceso principal. Las clases de administradores se definen en el multiprocessing.managers módulo:

class multiprocessing.managers.BaseManager([address[, authkey]])

Cree un objeto BaseManager.

Una vez creado uno debe llamar start() o get_server().serve_forever() para asegurarse de que el objeto de administrador se refiere a un proceso de administrador iniciado.

Dirección es la dirección en la que el proceso de administrador escucha nuevas conexiones. Si Dirección es None luego se elige uno arbitrario.

Clave de autenticación es la clave de autenticación que se utilizará para verificar la validez de las conexiones entrantes al proceso del servidor. Si Clave de autenticación es None luego current_process().authkey se utiliza. De lo contrario Clave de autenticación se utiliza y debe ser una cadena de bytes.

start([initializer[, initargs]])

Inicie un subproceso para iniciar el administrador. Si inicializador no es None entonces el subproceso llamará initializer(*initargs) cuando comienza.

get_server()

Devuelve un Server objeto que representa el servidor real bajo el control del Administrador. los Server objeto soporta el serve_forever() método:

>>> from multiprocessing.managers import BaseManager
>>> manager = BaseManager(address=('', 50000), authkey=b'abc')
>>> server = manager.get_server()
>>> server.serve_forever()

Server además tiene un address atributo.

connect()

Conecte un objeto de administrador local a un proceso de administrador remoto:

>>> from multiprocessing.managers import BaseManager
>>> m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc')
>>> m.connect()
shutdown()

Detenga el proceso utilizado por el gerente. Esto solo está disponible si start() se ha utilizado para iniciar el proceso del servidor.

Esto se puede llamar varias veces.

register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])

Un método de clase que se puede utilizar para registrar un tipo o llamar con la clase de administrador.

typeid es un “identificador de tipo” que se utiliza para identificar un tipo particular de objeto compartido. Debe ser una cadena.

invocable es un invocable que se utiliza para crear objetos para este tipo de identificador. Si una instancia de administrador se conectará al servidor mediante el connect() método, o si el create_method el argumento es False entonces esto se puede dejar como None.

tipo de proxy es una subclase de BaseProxy que se utiliza para crear proxies para objetos compartidos con este typeid. Si None luego, se crea automáticamente una clase de proxy.

expuesto se utiliza para especificar una secuencia de nombres de métodos a los que se debe permitir el acceso de los proxies para este typeid utilizando BaseProxy._callmethod(). (Si expuesto es None luego proxytype._exposed_ se utiliza en su lugar si existe.) En el caso de que no se especifique una lista expuesta, todos los “métodos públicos” del objeto compartido serán accesibles. (Aquí, un “método público” significa cualquier atributo que tiene un __call__() método y cuyo nombre no comienza con '_'.)

method_to_typeid es un mapeo utilizado para especificar el tipo de retorno de los métodos expuestos que deberían devolver un proxy. Asigna nombres de métodos a cadenas de typeid. (Si method_to_typeid es None luego proxytype._method_to_typeid_ se utiliza en su lugar si existe.) Si el nombre de un método no es una clave de este mapeo o si el mapeo es None luego, el objeto devuelto por el método se copiará por valor.

create_method determina si se debe crear un método con nombre typeid que se puede usar para decirle al proceso del servidor que cree un nuevo objeto compartido y le devuelva un proxy. Por defecto es True.

BaseManager Las instancias también tienen una propiedad de solo lectura:

address

La dirección utilizada por el gerente.

Modificado en la versión 3.3: Los objetos de administrador admiten el protocolo de gestión de contexto; consulte Tipos de administrador de contexto. __enter__() inicia el proceso del servidor (si aún no se ha iniciado) y luego devuelve el objeto administrador. __exit__() llamadas shutdown().

En versiones anteriores __enter__() no inició el proceso del servidor del administrador si aún no se había iniciado.

class multiprocessing.managers.SyncManager

Una subclase de BaseManager que se puede utilizar para la sincronización de procesos. Los objetos de este tipo son devueltos por multiprocessing.Manager().

Sus métodos crean y regresan Objetos proxy para que varios tipos de datos de uso común se sincronicen entre procesos. Esto incluye, en particular, listas y diccionarios compartidos.

Barrier(parties[, action[, timeout]])

Crea un compartido threading.Barrier objeto y devuelve un proxy para él.

Nuevo en la versión 3.3.

BoundedSemaphore([value])

Crea un compartido threading.BoundedSemaphore objeto y devuelve un proxy para él.

Condition([lock])

Crea un compartido threading.Condition objeto y devuelve un proxy para él.

Si cerrar con llave se proporciona, entonces debería ser un proxy para un threading.Lock o threading.RLock objeto.

Modificado en la versión 3.3: los wait_for() se agregó el método.

Event()

Crea un compartido threading.Event objeto y devuelve un proxy para él.

Lock()

Crea un compartido threading.Lock objeto y devuelve un proxy para él.

Namespace()

Crea un compartido Namespace objeto y devuelve un proxy para él.

Queue([maxsize])

Crea un compartido queue.Queue objeto y devuelve un proxy para él.

RLock()

Crea un compartido threading.RLock objeto y devuelve un proxy para él.

Semaphore([value])

Crea un compartido threading.Semaphore objeto y devuelve un proxy para él.

Array(typecode, sequence)

Cree una matriz y devuelva un proxy para ella.

Value(typecode, value)

Crea un objeto con escritura value atributo y devolver un proxy para él.

dict()
dict(mapping)
dict(sequence)

Crea un compartido dict objeto y devuelve un proxy para él.

list()
list(sequence)

Crea un compartido list objeto y devuelve un proxy para él.

Modificado en la versión 3.6: Los objetos compartidos se pueden anidar. Por ejemplo, un objeto contenedor compartido, como una lista compartida, puede contener otros objetos compartidos que serán administrados y sincronizados por el SyncManager.

class multiprocessing.managers.Namespace

Un tipo que puede registrarse con SyncManager.

Un objeto de espacio de nombres no tiene métodos públicos, pero tiene atributos de escritura. Su representación muestra los valores de sus atributos.

Sin embargo, cuando se usa un proxy para un objeto de espacio de nombres, un atributo que comienza con '_' será un atributo del proxy y no un atributo del referente:

>>> manager = multiprocessing.Manager()
>>> Global = manager.Namespace()
>>> Global.x = 10
>>> Global.y = 'hello'
>>> Global._z = 12.3    # this is an attribute of the proxy
>>> print(Global)
Namespace(x=10, y='hello')

Gerentes personalizados

Para crear su propio gerente, se crea una subclase de BaseManager y usa el register() classmethod para registrar nuevos tipos o invocables con la clase del administrador. Por ejemplo:

from multiprocessing.managers import BaseManager

class MathsClass:
    def add(self, x, y):
        return x + y
    def mul(self, x, y):
        return x * y

class MyManager(BaseManager):
    pass

MyManager.register('Maths', MathsClass)

if __name__ == '__main__':
    with MyManager() as manager:
        maths = manager.Maths()
        print(maths.add(4, 3))         # prints 7
        print(maths.mul(7, 8))         # prints 56

Usando un administrador remoto

Es posible ejecutar un servidor administrador en una máquina y hacer que los clientes lo usen desde otras máquinas (asumiendo que los firewalls involucrados lo permitan).

La ejecución de los siguientes comandos crea un servidor para una única cola compartida a la que pueden acceder los clientes remotos:

>>> from multiprocessing.managers import BaseManager
>>> from queue import Queue
>>> queue = Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

Un cliente puede acceder al servidor de la siguiente manera:

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')

Otro cliente también puede usarlo:

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'

Los procesos locales también pueden acceder a esa cola, utilizando el código de arriba en el cliente para acceder a ella de forma remota:

>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
...     def __init__(self, q):
...         self.q = q
...         super().__init__()
...     def run(self):
...         self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

Objetos proxy

Un proxy es un objeto que se refiere a un objeto compartido que vive (presumiblemente) en un proceso diferente. Se dice que el objeto compartido es el referente del proxy. Varios objetos proxy pueden tener el mismo referente.

Un objeto proxy tiene métodos que invocan los métodos correspondientes de su referente (aunque no todos los métodos del referente estarán necesariamente disponibles a través del proxy). De esta forma, un proxy se puede utilizar como su referente:

>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]

Tenga en cuenta que aplicando str() a un proxy devolverá la representación del referente, mientras que la aplicación repr() devolverá la representación del apoderado.

Una característica importante de los objetos proxy es que se pueden seleccionar para que puedan pasarse entre procesos. Como tal, un referente puede contener Objetos proxy. Esto permite el anidamiento de estas listas administradas, dictados y otros Objetos proxy:

>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b)         # referent of a now contains referent of b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
>>> print(a[0], b)
['hello'] ['hello']

De manera similar, los proxies dict y list pueden estar anidados uno dentro del otro:

>>> l_outer = manager.list([ manager.dict() for i in range(2) ])
>>> d_first_inner = l_outer[0]
>>> d_first_inner['a'] = 1
>>> d_first_inner['b'] = 2
>>> l_outer[1]['c'] = 3
>>> l_outer[1]['z'] = 26
>>> print(l_outer[0])
{'a': 1, 'b': 2}
>>> print(l_outer[1])
{'c': 3, 'z': 26}

Si es estándar (no proxy) list o dict los objetos están contenidos en un referente, las modificaciones a esos valores mutables no se propagarán a través del administrador porque el proxy no tiene forma de saber cuándo se modifican los valores contenidos dentro. Sin embargo, almacenar un valor en un proxy de contenedor (que desencadena una __setitem__ en el objeto proxy) se propaga a través del administrador y, por lo tanto, para modificar efectivamente dicho elemento, se podría reasignar el valor modificado al proxy contenedor:

# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# updating the dictionary, the proxy is notified of the change
lproxy[0] = d

Este enfoque es quizás menos conveniente que emplear anidados Objetos proxy para la mayoría de los casos de uso, pero también demuestra un nivel de control sobre la sincronización.

Nota

El proxy escribe multiprocessing No haga nada para respaldar las comparaciones por valor. Entonces, por ejemplo, tenemos:

>>> manager.list([1,2,3]) == [1,2,3]
False

En su lugar, se debe usar una copia del referente al hacer comparaciones.

class multiprocessing.managers.BaseProxy

Los objetos proxy son instancias de subclases de BaseProxy.

_callmethod(methodname[, args[, kwds]])

Llame y devuelva el resultado de un método del referente del proxy.

Si proxy es un proxy cuyo referente es obj luego la expresión

proxy._callmethod(methodname, args, kwds)

evaluará la expresión

getattr(obj, methodname)(*args, **kwds)

en el proceso del gerente.

El valor devuelto será una copia del resultado de la llamada o un proxy a un nuevo objeto compartido; consulte la documentación del method_to_typeid argumento de BaseManager.register().

Si la llamada genera una excepción, la vuelve a generar _callmethod(). Si se genera alguna otra excepción en el proceso del gerente, esto se convierte en una RemoteError excepción y es planteado por _callmethod().

Tenga en cuenta en particular que se generará una excepción si nombre del método no ha sido expuesto.

Un ejemplo del uso de _callmethod():

>>> l = manager.list(range(10))
>>> l._callmethod('__len__')
10
>>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent to l[2:7]
[2, 3, 4, 5, 6]
>>> l._callmethod('__getitem__', (20,))          # equivalent to l[20]
Traceback (most recent call last):
...
IndexError: list index out of range
_getvalue()

Devuelva una copia del referente.

Si el referente no se puede seleccionar, se generará una excepción.

__repr__()

Devuelve una representación del objeto proxy.

__str__()

Devuelve la representación del referente.

Limpiar

Un objeto proxy usa una devolución de llamada débil de modo que cuando se recolecta la basura, se anula el registro del administrador que posee su referente.

Un objeto compartido se elimina del proceso de administrador cuando ya no hay proxies que se refieran a él.

Grupos de procesos

Se puede crear un conjunto de procesos que llevarán a cabo las tareas que se le envíen con el Pool clase.

class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

Objeto de grupo de procesos que controla un grupo de procesos de trabajo a los que se pueden enviar trabajos. Admite resultados asincrónicos con tiempos de espera y devoluciones de llamada y tiene una implementación de mapa paralela.

procesos es el número de procesos de trabajo que se utilizarán. Si procesos es None luego el número devuelto por os.cpu_count() se utiliza.

Si inicializador no es None luego cada proceso de trabajador llamará initializer(*initargs) cuando comienza.

maxtasksperchild es la cantidad de tareas que un proceso de trabajo puede completar antes de salir y ser reemplazado por un proceso de trabajo nuevo, para permitir que se liberen los recursos no utilizados. El valor por defecto maxtasksperchild es None, lo que significa que los procesos de trabajo vivirán tanto como el grupo.

contexto se puede utilizar para especificar el contexto utilizado para iniciar los procesos de trabajo. Por lo general, un grupo se crea usando la función multiprocessing.Pool() o la Pool() método de un objeto de contexto. En ambos casos contexto está configurado correctamente.

Tenga en cuenta que los métodos del objeto de grupo solo deben ser invocados por el proceso que creó el grupo.

Advertencia

multiprocessing.pool los objetos tienen recursos internos que deben administrarse correctamente (como cualquier otro recurso) mediante el uso del grupo como administrador de contexto o llamando close() y terminate() a mano. No hacer esto puede hacer que el proceso se quede pendiente de la finalización.

Tenga en cuenta que es incorrecto confiar en el recolector de basura para destruir el grupo, ya que CPython no asegura que se llamará al finalizador del grupo (consulte object.__del__() para más información).

Nuevo en la versión 3.2: maxtasksperchild

Nuevo en la versión 3.4: contexto

Nota

Procesos de trabajo dentro de un Pool normalmente viven durante la duración completa de la cola de trabajo de la agrupación. Un patrón frecuente que se encuentra en otros sistemas (como Apache, mod_wsgi, etc.) para liberar recursos en poder de los trabajadores es permitir que un trabajador dentro de un grupo complete solo una cantidad determinada de trabajo antes de salir, limpiarse y generar un nuevo proceso. para reemplazar el anterior. los maxtasksperchild argumento a la Pool expone esta capacidad al usuario final.

apply(func[, args[, kwds]])

Llama func con argumentos argumentos y argumentos de palabras clave kwds. Bloquea hasta que el resultado está listo. Dados estos bloques, apply_async() es más adecuado para realizar trabajos en paralelo. Adicionalmente, func solo se ejecuta en uno de los trabajadores de la piscina.

apply_async(func[, args[, kwds[, callback[, error_callback]]]])

Una variante del apply() método que devuelve un AsyncResult objeto.

Si llamar de vuelta se especifica, entonces debería ser un invocable que acepte un solo argumento. Cuando el resultado esté listo llamar de vuelta se le aplica, es decir, a menos que la llamada haya fallado, en cuyo caso el error_callback se aplica en su lugar.

Si error_callback se especifica, entonces debería ser un invocable que acepte un solo argumento. Si la función de destino falla, entonces la error_callback se llama con la instancia de excepción.

Las devoluciones de llamada deben completarse de inmediato, ya que de lo contrario se bloqueará el hilo que maneja los resultados.

map(func, iterable[, chunksize])

Un equivalente paralelo de la map() función incorporada (solo admite una iterable sin embargo, para varios iterables, consulte starmap()). Bloquea hasta que el resultado está listo.

Este método divide el iterable en varios fragmentos que envía al grupo de procesos como tareas separadas. El tamaño (aproximado) de estos trozos se puede especificar configurando tamaño de porción a un entero positivo.

Tenga en cuenta que puede causar un alto uso de memoria para iterables muy largos. Considere usar imap() o imap_unordered() con explícito tamaño de porción opción para una mejor eficiencia.

map_async(func, iterable[, chunksize[, callback[, error_callback]]])

Una variante del map() método que devuelve un AsyncResult objeto.

Si llamar de vuelta se especifica, entonces debería ser un invocable que acepte un solo argumento. Cuando el resultado esté listo llamar de vuelta se le aplica, es decir, a menos que la llamada haya fallado, en cuyo caso el error_callback se aplica en su lugar.

Si error_callback se especifica, entonces debería ser un invocable que acepte un solo argumento. Si la función de destino falla, entonces la error_callback se llama con la instancia de excepción.

Las devoluciones de llamada deben completarse de inmediato, ya que de lo contrario se bloqueará el hilo que maneja los resultados.

imap(func, iterable[, chunksize])

Una versión más perezosa de map().

los tamaño de porción argumento es el mismo que el utilizado por el map() método. Para iterables muy largos usando un gran valor para tamaño de porción puede completar el trabajo mucho más rápido que usar el valor predeterminado de 1.

También si tamaño de porción es 1 entonces el next() método del iterador devuelto por el imap() El método tiene una opción se acabó el tiempo parámetro: next(timeout) elevará multiprocessing.TimeoutError si el resultado no se puede devolver dentro de se acabó el tiempo segundos.

imap_unordered(func, iterable[, chunksize])

Lo mismo que imap() excepto que el orden de los resultados del iterador devuelto debe considerarse arbitrario. (Solo cuando hay un solo proceso de trabajo se garantiza que el pedido es “correcto”).

starmap(func, iterable[, chunksize])

Igual que map() excepto que los elementos del iterable se espera que sean iterables que se descomprimen como argumentos.

Por lo tanto, un iterable de [(1,2), (3, 4)] resultados en [func(1,2),
func(3,4)]
.

Nuevo en la versión 3.3.

starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])

Una combinación de starmap() y map_async() que itera sobre iterable de iterables y llamadas func con los iterables desempaquetados. Devuelve un objeto de resultado.

Nuevo en la versión 3.3.

close()

Evita que se envíen más tareas al grupo. Una vez que se hayan completado todas las tareas, los procesos de trabajo se cerrarán.

terminate()

Detiene los procesos del trabajador inmediatamente sin completar el trabajo pendiente. Cuando el objeto de la piscina se recolecta como basura terminate() será llamado inmediatamente.

join()

Espere a que salgan los procesos de trabajo. Hay que llamar close() o terminate() antes de usar join().

Nuevo en la versión 3.3: Los objetos de grupo ahora admiten el protocolo de gestión de contexto; consulte Tipos de administrador de contexto. __enter__() devuelve el objeto de la piscina, y __exit__() llamadas terminate().

class multiprocessing.pool.AsyncResult

La clase del resultado devuelto por Pool.apply_async() y Pool.map_async().

get([timeout])

Devuelve el resultado cuando llegue. Si se acabó el tiempo no es None y el resultado no llega dentro se acabó el tiempo segundos entonces multiprocessing.TimeoutError es elevado. Si la llamada remota generó una excepción, entonces esa excepción será levantada por get().

wait([timeout])

Espere hasta que el resultado esté disponible o hasta que se acabó el tiempo Pasan los segundos.

ready()

Devuelve si la llamada se ha completado.

successful()

Devuelve si la llamada se completó sin generar una excepción. Elevará ValueError si el resultado no está listo.

Modificado en la versión 3.7: Si el resultado no está listo, ValueError se levanta en lugar de AssertionError.

El siguiente ejemplo demuestra el uso de una piscina:

from multiprocessing import Pool
import time

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(processes=4) as pool:         # start 4 worker processes
        result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
        print(result.get(timeout=1))        # prints "100" unless your computer is *very* slow

        print(pool.map(f, range(10)))       # prints "[0, 1, 4,..., 81]"

        it = pool.imap(f, range(10))
        print(next(it))                     # prints "0"
        print(next(it))                     # prints "1"
        print(it.next(timeout=1))           # prints "4" unless your computer is *very* slow

        result = pool.apply_async(time.sleep, (10,))
        print(result.get(timeout=1))        # raises multiprocessing.TimeoutError

Oyentes y Clientes

Por lo general, el paso de mensajes entre procesos se realiza usando colas o usando Connection objetos devueltos por Pipe().

sin embargo, el multiprocessing.connection El módulo permite cierta flexibilidad adicional. Básicamente, proporciona una API orientada a mensajes de alto nivel para tratar con sockets o canalizaciones con nombre de Windows. También tiene soporte para autenticación implícita utilizando el hmac módulo, y para sondear múltiples conexiones al mismo tiempo.

multiprocessing.connection.deliver_challenge(connection, authkey)

Envíe un mensaje generado aleatoriamente al otro extremo de la conexión y espere una respuesta.

Si la respuesta coincide con el resumen del mensaje usando Clave de autenticación como clave, se envía un mensaje de bienvenida al otro extremo de la conexión. De lo contrario AuthenticationError es elevado.

multiprocessing.connection.answer_challenge(connection, authkey)

Reciba un mensaje, calcule el resumen del mensaje usando Clave de autenticación como clave y, a continuación, envíe el resumen.

Si no se recibe un mensaje de bienvenida, entonces AuthenticationError es elevado.

multiprocessing.connection.Client(address[, family[, authkey]])

Intente establecer una conexión con el oyente que está usando la dirección Dirección, devolviendo un Connection.

El tipo de conexión está determinado por familia argumento, pero esto generalmente se puede omitir ya que generalmente se puede inferir del formato de Dirección. (Ver Formatos de dirección)

Si Clave de autenticación se proporciona y no None, debe ser una cadena de bytes y se utilizará como clave secreta para un desafío de autenticación basado en HMAC. No se realiza ninguna autenticación si Clave de autenticación es Ninguno. AuthenticationError se genera si falla la autenticación. Ver Claves de autenticación.

class multiprocessing.connection.Listener([address[, family[, backlog[, authkey]]]])

Un contenedor para un socket enlazado o una tubería con nombre de Windows que está ‘escuchando’ conexiones.

Dirección es la dirección que utilizará el conector vinculado o la tubería con nombre del objeto de escucha.

Nota

Si se utiliza una dirección de ‘0.0.0.0’, la dirección no será un punto final conectable en Windows. Si necesita un punto final conectable, debe usar ‘127.0.0.1’.

familia es el tipo de conector (o tubería con nombre) que se utilizará. Esta puede ser una de las cadenas 'AF_INET' (para un socket TCP), 'AF_UNIX' (para un socket de dominio Unix) o 'AF_PIPE' (para una tubería con nombre de Windows). De estos, solo se garantiza que el primero estará disponible. Si familia es None entonces la familia se infiere del formato de Dirección. Si Dirección es también None entonces se elige un valor predeterminado. Este valor predeterminado es la familia que se supone que es la más rápida disponible. Ver Formatos de dirección. Tenga en cuenta que si familia es 'AF_UNIX' y la dirección es None entonces el socket se creará en un directorio temporal privado creado usando tempfile.mkstemp().

Si el objeto de escucha usa un socket, entonces reserva (1 por defecto) se pasa al listen() método del zócalo una vez que se ha unido.

Si Clave de autenticación se proporciona y no None, debe ser una cadena de bytes y se utilizará como clave secreta para un desafío de autenticación basado en HMAC. No se realiza ninguna autenticación si Clave de autenticación es Ninguno. AuthenticationError se genera si falla la autenticación. Ver Claves de autenticación.

accept()

Acepte una conexión en el socket vinculado o la tubería con nombre del objeto de escucha y devuelva un Connection objeto. Si se intenta la autenticación y falla, entonces AuthenticationError es elevado.

close()

Cierre el conector vinculado o la tubería con nombre del objeto de escucha. Esto se llama automáticamente cuando el oyente es recolectado como basura. Sin embargo, es aconsejable llamarlo explícitamente.

Los objetos de escucha tienen las siguientes propiedades de solo lectura:

address

La dirección que está utilizando el objeto Listener.

last_accepted

La dirección de la que provino la última conexión aceptada. Si esto no está disponible, entonces es None.

Nuevo en la versión 3.3: Los objetos de escucha ahora admiten el protocolo de gestión de contexto; consulte Tipos de administrador de contexto. __enter__() devuelve el objeto de escucha y __exit__() llamadas close().

multiprocessing.connection.wait(object_list, timeout=None)

Espere hasta que entre un objeto lista_objetos está listo. Devuelve la lista de esos objetos en lista_objetos que están listos. Si se acabó el tiempo es un flotador, entonces la llamada se bloquea durante como máximo esa cantidad de segundos. Si se acabó el tiempo es None luego se bloqueará por un período ilimitado. Un tiempo de espera negativo equivale a un tiempo de espera cero.

Tanto para Unix como para Windows, un objeto puede aparecer en lista_objetos si esto es

  • un legible Connection objeto;
  • un conectado y legible socket.socket objeto; o
  • los sentinel atributo de un Process objeto.

Una conexión o un objeto de socket está listo cuando hay datos disponibles para leer de él, o el otro extremo se ha cerrado.

Unix: wait(object_list, timeout) casi equivalente select.select(object_list, [], [], timeout). La diferencia es que, si select.select() es interrumpido por una señal, puede aumentar OSError con un número de error de EINTR, mientras que wait() no.

Ventanas: Un elemento en lista_objetos debe ser un identificador entero que sea de espera (de acuerdo con la definición utilizada por la documentación de la función Win32 WaitForMultipleObjects()) o puede ser un objeto con un fileno() método que devuelve un identificador de enchufe o un identificador de tubería. (Tenga en cuenta que las manijas de las tuberías y las manijas no asas de espera.)

Nuevo en la versión 3.3.

Ejemplos de

El siguiente código de servidor crea un oyente que usa 'secret password' como clave de autenticación. Luego espera una conexión y envía algunos datos al cliente:

from multiprocessing.connection import Listener
from array import array

address = ('localhost', 6000)     # family is deduced to be 'AF_INET'

with Listener(address, authkey=b'secret password') as listener:
    with listener.accept() as conn:
        print('connection accepted from', listener.last_accepted)

        conn.send([2.25, None, 'junk', float])

        conn.send_bytes(b'hello')

        conn.send_bytes(array('i', [42, 1729]))

El siguiente código se conecta al servidor y recibe algunos datos del servidor:

from multiprocessing.connection import Client
from array import array

address = ('localhost', 6000)

with Client(address, authkey=b'secret password') as conn:
    print(conn.recv())                  # => [2.25, None, 'junk', float]

    print(conn.recv_bytes())            # => 'hello'

    arr = array('i', [0, 0, 0, 0, 0])
    print(conn.recv_bytes_into(arr))    # => 8
    print(arr)                          # => array('i', [42, 1729, 0, 0, 0])

El siguiente código usa wait() para esperar mensajes de varios procesos a la vez:

import time, random
from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait

def foo(w):
    for i in range(10):
        w.send((i, current_process().name))
    w.close()

if __name__ == '__main__':
    readers = []

    for i in range(4):
        r, w = Pipe(duplex=False)
        readers.append(r)
        p = Process(target=foo, args=(w,))
        p.start()
        # We close the writable end of the pipe now to be sure that
        # p is the only process which owns a handle for it.  This
        # ensures that when p closes its handle for the writable end,
        # wait() will promptly report the readable end as being ready.
        w.close()

    while readers:
        for r in wait(readers):
            try:
                msg = r.recv()
            except EOFError:
                readers.remove(r)
            else:
                print(msg)

Formatos de dirección

  • Un 'AF_INET' dirección es una tupla del formulario (hostname, port) dónde nombre de host es una cuerda y Puerto es un entero.
  • Un 'AF_UNIX' dirección es una cadena que representa un nombre de archivo en el sistema de archivos.
  • Un 'AF_PIPE' dirección es una cadena de la forma r'.pipe{PipeName}'. Usar Client() para conectarse a una tubería con nombre en una computadora remota llamada Nombre del servidor uno debe usar una dirección del formulario r'ServerNamepipe{PipeName}' en lugar de.

Tenga en cuenta que cualquier cadena que comience con dos barras diagonales inversas se asume de forma predeterminada como una 'AF_PIPE' dirección en lugar de una 'AF_UNIX' Dirección.

Claves de autenticación

Cuando uno usa Connection.recv, los datos recibidos se eliminan automáticamente. Desafortunadamente, eliminar datos de una fuente que no es de confianza es un riesgo para la seguridad. Por lo tanto Listener y Client() utilizar el hmac módulo para proporcionar autenticación implícita.

Una clave de autenticación es una cadena de bytes que se puede considerar como una contraseña: una vez que se establece una conexión, ambos extremos exigirán una prueba de que el otro conoce la clave de autenticación. (Demostrar que ambos extremos usan la misma clave no no implica enviar la clave a través de la conexión).

Si se solicita autenticación pero no se especifica una clave de autenticación, el valor de retorno de current_process().authkey se utiliza (ver Process). Este valor será heredado automáticamente por cualquier Process objeto que crea el proceso actual. Esto significa que (de forma predeterminada) todos los procesos de un programa multiproceso compartirán una única clave de autenticación que se puede utilizar al configurar conexiones entre ellos.

También se pueden generar claves de autenticación adecuadas utilizando os.urandom().

Inicio sesión

Está disponible algún soporte para el registro. Sin embargo, tenga en cuenta que el logging El paquete no utiliza bloqueos compartidos de procesos, por lo que es posible (según el tipo de controlador) que se mezclen los mensajes de diferentes procesos.

multiprocessing.get_logger()

Devuelve el registrador utilizado por multiprocessing. Si es necesario, se creará uno nuevo.

Cuando se crea por primera vez, el registrador tiene nivel logging.NOTSET y ningún controlador predeterminado. Los mensajes enviados a este registrador no se propagarán por defecto al registrador raíz.

Tenga en cuenta que en los procesos secundarios de Windows solo heredarán el nivel del registrador del proceso principal; no se heredará ninguna otra personalización del registrador.

multiprocessing.log_to_stderr()

Esta función realiza una llamada a get_logger() pero además de devolver el registrador creado por get_logger, agrega un controlador que envía la salida a sys.stderr usando formato '[%(levelname)s/%(processName)s] %(message)s'.

A continuación se muestra una sesión de ejemplo con el registro activado:

>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0

Para obtener una tabla completa de los niveles de registro, consulte la logging módulo.

los multiprocessing.dummy módulo

multiprocessing.dummy replica la API de multiprocessing pero no es más que una envoltura alrededor del threading módulo.

En particular, el Pool función proporcionada por multiprocessing.dummy devuelve una instancia de ThreadPool, que es una subclase de Pool que admite todas las mismas llamadas a métodos, pero utiliza un grupo de subprocesos de trabajo en lugar de procesos de trabajo.

class multiprocessing.pool.ThreadPool([processes[, initializer[, initargs]]])

Un objeto de grupo de subprocesos que controla un grupo de subprocesos de trabajo a los que se pueden enviar trabajos. ThreadPool las instancias son totalmente compatibles con la interfaz Pool instancias, y sus recursos también deben administrarse correctamente, ya sea utilizando el grupo como administrador de contexto o llamando close() y terminate() a mano.

procesos es el número de subprocesos de trabajo que se utilizarán. Si procesos es None luego el número devuelto por os.cpu_count() se utiliza.

Si inicializador no es None luego cada proceso de trabajador llamará initializer(*initargs) cuando comienza.

diferente a Pool, maxtasksperchild y contexto no se puede proporcionar.

Nota

A ThreadPool comparte la misma interfaz que Pool, que está diseñado en torno a un conjunto de procesos y es anterior a la introducción de la concurrent.futures módulo. Como tal, hereda algunas operaciones que no tienen sentido para un grupo respaldado por subprocesos y tiene su propio tipo para representar el estado de los trabajos asincrónicos. AsyncResult, eso no es entendido por ninguna otra biblioteca.

Los usuarios generalmente deberían preferir usar concurrent.futures.ThreadPoolExecutor, que tiene una interfaz más simple que se diseñó en torno a subprocesos desde el principio y que devuelve concurrent.futures.Future instancias que son compatibles con muchas otras bibliotecas, incluidas asyncio.

Pautas de programación

Hay ciertas pautas y modismos que deben seguirse al usar multiprocessing.

Todos los métodos de inicio

Lo siguiente se aplica a todos los métodos de inicio.

Evite el estado compartido

En la medida de lo posible, se debe intentar evitar la transferencia de grandes cantidades de datos entre procesos.

Probablemente sea mejor ceñirse al uso de colas o conductos para la comunicación entre procesos en lugar de usar las primitivas de sincronización de nivel inferior.

Picklability

Asegúrese de que los argumentos de los métodos de los proxies se puedan seleccionar.

Seguridad de subprocesos de proxies

No utilice un objeto proxy de más de un hilo a menos que lo proteja con un candado.

(Nunca hay un problema con los diferentes procesos que utilizan el mismo apoderado.)

Unirse a procesos zombies

En Unix, cuando un proceso finaliza pero no se ha unido, se convierte en un zombi. Nunca debería haber muchos porque cada vez que comienza un nuevo proceso (o active_children() se llama) se unirán todos los procesos completados que aún no se hayan unido. También llamando a un proceso terminado Process.is_alive se unirá al proceso. Aun así, probablemente sea una buena práctica unirse explícitamente a todos los procesos que inicie.

Mejor heredar que encurtir / despegar

Al usar el Aparecer o tenedor métodos de inicio de muchos tipos desde multiprocessing deben ser decapables para que los procesos secundarios puedan usarlos. Sin embargo, generalmente se debe evitar enviar objetos compartidos a otros procesos utilizando canalizaciones o colas. En su lugar, debe organizar el programa de modo que un proceso que necesita acceso a un recurso compartido creado en otro lugar pueda heredarlo de un proceso anterior.

Evite la terminación de procesos

Utilizando el Process.terminate El método para detener un proceso puede provocar que cualquier recurso compartido (como bloqueos, semáforos, conductos y colas) que esté utilizando actualmente el proceso se rompa o no esté disponible para otros procesos.

Por lo tanto, probablemente sea mejor considerar solo el uso Process.terminate en procesos que nunca utilizan recursos compartidos.

Unirse a procesos que usan colas

Tenga en cuenta que un proceso que ha puesto elementos en una cola esperará antes de terminar hasta que todos los elementos almacenados en búfer sean alimentados por el subproceso “alimentador” a la tubería subyacente. (El proceso hijo puede llamar al Queue.cancel_join_thread método de la cola para evitar este comportamiento).

Esto significa que cada vez que use una cola, debe asegurarse de que todos los elementos que se han colocado en la cola se eliminarán eventualmente antes de unirse al proceso. De lo contrario, no puede estar seguro de que terminarán los procesos que han puesto elementos en la cola. Recuerde también que los procesos no demoníacos se unirán automáticamente.

Un ejemplo que se interbloqueará es el siguiente:

from multiprocessing import Process, Queue

def f(q):
    q.put('X' * 1000000)

if __name__ == '__main__':
    queue = Queue()
    p = Process(target=f, args=(queue,))
    p.start()
    p.join()                    # this deadlocks
    obj = queue.get()

Una solución aquí sería intercambiar las dos últimas líneas (o simplemente eliminar el p.join() línea).

Pasar recursos explícitamente a procesos secundarios

En Unix usando el tenedor método de inicio, un proceso hijo puede hacer uso de un recurso compartido creado en un proceso padre utilizando un recurso global. Sin embargo, es mejor pasar el objeto como argumento al constructor del proceso hijo.

Además de hacer que el código sea (potencialmente) compatible con Windows y los otros métodos de inicio, esto también asegura que mientras el proceso hijo todavía esté vivo, el objeto no será recolectado como basura en el proceso padre. Esto puede ser importante si se libera algún recurso cuando el objeto se recolecta como basura en el proceso principal.

Entonces, por ejemplo

from multiprocessing import Process, Lock

def f():
    ... do something using "lock" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f).start()

debería ser reescrito como

from multiprocessing import Process, Lock

def f(l):
    ... do something using "l" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f, args=(lock,)).start()

Cuidado con reemplazar sys.stdin con un “archivo como objeto”

multiprocessing originalmente llamado incondicionalmente:

os.close(sys.stdin.fileno())

en el multiprocessing.Process._bootstrap() método: esto dio lugar a problemas con los procesos en proceso. Esto se ha cambiado a:

sys.stdin.close()
sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)

Lo que resuelve el problema fundamental de los procesos que chocan entre sí, lo que da como resultado un error de descriptor de archivo incorrecto, pero presenta un peligro potencial para las aplicaciones que reemplazan sys.stdin() con un “objeto similar a un archivo” con almacenamiento en búfer de salida. Este peligro es que si varios procesos llaman close() en este objeto similar a un archivo, podría dar como resultado que los mismos datos se viertan al objeto varias veces, lo que da como resultado la corrupción.

Si escribe un objeto similar a un archivo e implementa su propio almacenamiento en caché, puede hacerlo seguro para la bifurcación almacenando el pid cada vez que lo agrega al caché y descartando el caché cuando cambia el pid. Por ejemplo:

@property
def cache(self):
    pid = os.getpid()
    if pid != self._pid:
        self._pid = pid
        self._cache = []
    return self._cache

Para más información, ver bpo-5155, bpo-5313 y bpo-5331

los Aparecer y tenedor métodos de inicio

Hay algunas restricciones adicionales que no se aplican a la tenedor método de inicio.

Más picklability

Asegúrese de que todos los argumentos Process.__init__() son encurtidos. Además, si subclase Process a continuación, asegúrese de que las instancias sean seleccionables cuando el Process.start se llama al método.

Variables globales

Tenga en cuenta que si el código que se ejecuta en un proceso secundario intenta acceder a una variable global, entonces el valor que ve (si lo hay) puede no ser el mismo que el valor en el proceso principal en el momento en que Process.start fue llamado.

Sin embargo, las variables globales que son solo constantes de nivel de módulo no causan problemas.

Importación segura del módulo principal

Asegúrese de que el módulo principal pueda ser importado de forma segura por un nuevo intérprete de Python sin causar efectos secundarios no deseados (como iniciar un nuevo proceso).

Por ejemplo, usando el Aparecer o tenedor El método de inicio que ejecuta el siguiente módulo fallaría con un RuntimeError:

from multiprocessing import Process

def foo():
    print('hello')

p = Process(target=foo)
p.start()

En su lugar, uno debería proteger el “punto de entrada” del programa utilizando if
__name__ == '__main__':
como sigue:

from multiprocessing import Process, freeze_support, set_start_method

def foo():
    print('hello')

if __name__ == '__main__':
    freeze_support()
    set_start_method('spawn')
    p = Process(target=foo)
    p.start()

(Los freeze_support() La línea se puede omitir si el programa se ejecutará normalmente en lugar de congelarse).

Esto permite que el intérprete de Python recién generado importe de forma segura el módulo y luego ejecute el foo() función.

Se aplican restricciones similares si se crea un grupo o administrador en el módulo principal.

Ejemplos de

Demostración de cómo crear y utilizar administradores y proxies personalizados:

from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator

##

class Foo:
    def f(self):
        print('you called Foo.f()')
    def g(self):
        print('you called Foo.g()')
    def _h(self):
        print('you called Foo._h()')

# A simple generator function
def baz():
    for i in range(10):
        yield i*i

# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
    _exposed_ = ['__next__']
    def __iter__(self):
        return self
    def __next__(self):
        return self._callmethod('__next__')

# Function to return the operator module
def get_operator_module():
    return operator

##

class MyManager(BaseManager):
    pass

# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)

# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))

# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)

# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)

##

def test():
    manager = MyManager()
    manager.start()

    print('-' * 20)

    f1 = manager.Foo1()
    f1.f()
    f1.g()
    assert not hasattr(f1, '_h')
    assert sorted(f1._exposed_) == sorted(['f', 'g'])

    print('-' * 20)

    f2 = manager.Foo2()
    f2.g()
    f2._h()
    assert not hasattr(f2, 'f')
    assert sorted(f2._exposed_) == sorted(['g', '_h'])

    print('-' * 20)

    it = manager.baz()
    for i in it:
        print('<%d>' % i, end=' ')
    print()

    print('-' * 20)

    op = manager.operator()
    print('op.add(23, 45) =', op.add(23, 45))
    print('op.pow(2, 94) =', op.pow(2, 94))
    print('op._exposed_ =', op._exposed_)

##

if __name__ == '__main__':
    freeze_support()
    test()

Utilizando Pool:

import multiprocessing
import time
import random
import sys

#
# Functions used by test code
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % (
        multiprocessing.current_process().name,
        func.__name__, args, result
        )

def calculatestar(args):
    return calculate(*args)

def mul(a, b):
    time.sleep(0.5 * random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5 * random.random())
    return a + b

def f(x):
    return 1.0 / (x - 5.0)

def pow3(x):
    return x ** 3

def noop(x):
    pass

#
# Test code
#

def test():
    PROCESSES = 4
    print('Creating pool with %d processesn' % PROCESSES)

    with multiprocessing.Pool(PROCESSES) as pool:
        #
        # Tests
        #

        TASKS = [(mul, (i, 7)) for i in range(10)] + 
                [(plus, (i, 8)) for i in range(10)]

        results = [pool.apply_async(calculate, t) for t in TASKS]
        imap_it = pool.imap(calculatestar, TASKS)
        imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)

        print('Ordered results using pool.apply_async():')
        for r in results:
            print('t', r.get())
        print()

        print('Ordered results using pool.imap():')
        for x in imap_it:
            print('t', x)
        print()

        print('Unordered results using pool.imap_unordered():')
        for x in imap_unordered_it:
            print('t', x)
        print()

        print('Ordered results using pool.map() --- will block till complete:')
        for x in pool.map(calculatestar, TASKS):
            print('t', x)
        print()

        #
        # Test error handling
        #

        print('Testing error handling:')

        try:
            print(pool.apply(f, (5,)))
        except ZeroDivisionError:
            print('tGot ZeroDivisionError as expected from pool.apply()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(pool.map(f, list(range(10))))
        except ZeroDivisionError:
            print('tGot ZeroDivisionError as expected from pool.map()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(list(pool.imap(f, list(range(10)))))
        except ZeroDivisionError:
            print('tGot ZeroDivisionError as expected from list(pool.imap())')
        else:
            raise AssertionError('expected ZeroDivisionError')

        it = pool.imap(f, list(range(10)))
        for i in range(10):
            try:
                x = next(it)
            except ZeroDivisionError:
                if i == 5:
                    pass
            except StopIteration:
                break
            else:
                if i == 5:
                    raise AssertionError('expected ZeroDivisionError')

        assert i == 9
        print('tGot ZeroDivisionError as expected from IMapIterator.next()')
        print()

        #
        # Testing timeouts
        #

        print('Testing ApplyResult.get() with timeout:', end=' ')
        res = pool.apply_async(calculate, TASKS[0])
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('nt%s' % res.get(0.02))
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()

        print('Testing IMapIterator.next() with timeout:', end=' ')
        it = pool.imap(calculatestar, TASKS)
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('nt%s' % it.next(0.02))
            except StopIteration:
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()


if __name__ == '__main__':
    multiprocessing.freeze_support()
    test()

Un ejemplo que muestra cómo usar colas para alimentar tareas a una colección de procesos de trabajo y recopilar los resultados:

import time
import random

from multiprocessing import Process, Queue, current_process, freeze_support

#
# Function run by worker processes
#

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = calculate(func, args)
        output.put(result)

#
# Function used to calculate result
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % 
        (current_process().name, func.__name__, args, result)

#
# Functions referenced by tasks
#

def mul(a, b):
    time.sleep(0.5*random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

#
#
#

def test():
    NUMBER_OF_PROCESSES = 4
    TASKS1 = [(mul, (i, 7)) for i in range(20)]
    TASKS2 = [(plus, (i, 8)) for i in range(10)]

    # Create queues
    task_queue = Queue()
    done_queue = Queue()

    # Submit tasks
    for task in TASKS1:
        task_queue.put(task)

    # Start worker processes
    for i in range(NUMBER_OF_PROCESSES):
        Process(target=worker, args=(task_queue, done_queue)).start()

    # Get and print results
    print('Unordered results:')
    for i in range(len(TASKS1)):
        print('t', done_queue.get())

    # Add more tasks using `put()`
    for task in TASKS2:
        task_queue.put(task)

    # Get and print some more results
    for i in range(len(TASKS2)):
        print('t', done_queue.get())

    # Tell child processes to stop
    for i in range(NUMBER_OF_PROCESSES):
        task_queue.put('STOP')


if __name__ == '__main__':
    freeze_support()
    test()