Saltar al contenido

Terminar un programa de multiprocesamiento de Python una vez que uno de sus trabajadores cumpla con una determinada condición

Nuestros mejores programadores han agotado sus reservas de café, buscando a tiempo completo por la solución, hasta que Rubén halló el arreglo en Bitbucket así que ahora la comparte con nosotros.

Solución:

Ningún proceso puede detener otro que no sea la fuerza bruta os.kill()-como mazos. No vayas allí.

Para hacer esto con sensatez, debe reelaborar su enfoque básico: el proceso principal y los procesos de trabajo deben comunicarse entre sí.

Lo desarrollaría, pero el ejemplo hasta ahora es también básico para que sea útil. Por ejemplo, tal como está escrito, no más de num_workers llamadas a rand() se han realizado alguna vez, por lo que no hay razón para creer que alguno de ellos deba ser> 0,7.

Una vez que la función de trabajo desarrolla un bucle, se vuelve más obvio. Por ejemplo, el trabajador podría verificar si un mp.Event se establece en la parte superior del bucle, y simplemente salga si lo está. El proceso principal establecería Event cuando quiere que los trabajadores se detengan.

Y un trabajador puede establecer una mp.Event cuando encontró un valor> 0,7. El proceso principal esperaría eso Event, luego configure el “tiempo para detener” Event para que los trabajadores lo vean, luego haga el ciclo habitual .join()-pidiendo a los trabajadores un cierre limpio.

EDITAR

Aquí está desarrollando una solución limpia y portátil, asumiendo que los trabajadores continuarán hasta que al menos uno encuentre un valor> 0.7. Tenga en cuenta que eliminé numpy de esto, porque es irrelevante para este código. El código aquí debería funcionar bien en cualquier Python estándar en cualquier plataforma compatible multiprocessing:

import random
from time import sleep

def worker(i, quit, foundit):
    print "%d started" % i
    while not quit.is_set():
        x = random.random()
        if x > 0.7:
            print '%d found %g' % (i, x)
            foundit.set()
            break
        sleep(0.1)
    print "%d is done" % i

if __name__ == "__main__":
    import multiprocessing as mp
    quit = mp.Event()
    foundit = mp.Event()
    for i in range(mp.cpu_count()):
        p = mp.Process(target=worker, args=(i, quit, foundit))
        p.start()
    foundit.wait()
    quit.set()

Y algunos resultados de muestra:

0 started
1 started
2 started
2 found 0.922803
2 is done
3 started
3 is done
4 started
4 is done
5 started
5 is done
6 started
6 is done
7 started
7 is done
0 is done
1 is done

Todo se apaga limpiamente: sin rastreos, sin terminaciones anormales, sin procesos zombis dejados atrás … limpio como un silbido.

MATANDOLO

Como señaló @noxdafox, hay un Pool.terminate() método que hace lo mejor que puede, en todas las plataformas, para matar los procesos de trabajo sin importar lo que estén haciendo (por ejemplo, en Windows llama a la plataforma TerminateProcess()). No lo recomiendo para el código de producción, porque matar un proceso abruptamente puede dejar varios recursos compartidos en estados inconsistentes o dejar que se filtren. Hay varias advertencias al respecto en el multiprocessing docs, a los que debe agregar los documentos del sistema operativo.

Aún así, ¡puede ser conveniente! Aquí hay un programa completo que utiliza este enfoque. Tenga en cuenta que cambié el límite a 0,95, para que sea más probable que tarde más de un parpadeo en ejecutarse:

import random
from time import sleep

def worker(i):
    print "%d started" % i
    while True:
        x = random.random()
        print '%d found %g' % (i, x)
        if x > 0.95:
            return x # triggers callback
        sleep(0.5)

# callback running only in __main__
def quit(arg):
    print "quitting with %g" % arg
    # note: p is visible because it's global in __main__
    p.terminate()  # kill all pool workers

if __name__ == "__main__":
    import multiprocessing as mp
    ncpu = mp.cpu_count()
    p = mp.Pool(ncpu)
    for i in range(ncpu):
        p.apply_async(worker, args=(i,), callback=quit)
    p.close()
    p.join()

Y algunos resultados de muestra:

$ python mptest.py
0 started
0 found 0.391351
1 started
1 found 0.767374
2 started
2 found 0.110969
3 started
3 found 0.611442
4 started
4 found 0.790782
5 started
5 found 0.554611
6 started
6 found 0.0483844
7 started
7 found 0.862496
0 found 0.27175
1 found 0.0398836
2 found 0.884015
3 found 0.988702
quitting with 0.988702
4 found 0.909178
5 found 0.336805
6 found 0.961192
7 found 0.912875
$ [the program ended]

Hay una forma mucho más limpia y pitónica de hacer lo que quieres hacer y se logra mediante el uso de las funciones de devolución de llamada que ofrece multiprocessing.Pool.

Puede consultar esta pregunta para ver un ejemplo de implementación.

Como mencionó uno de los otros usuarios, necesita que los procesos se comuniquen entre sí para que terminen con sus pares. Si bien puede usar os.kill para terminar los procesos de pares, es más elegante señalar una terminación.

La solución que utilicé es bastante simple: 1. averigüe el ID de proceso (pid) del proceso principal, que genera todos los demás procesos de trabajo. Esta información de conexión está disponible en el sistema operativo, que realiza un seguimiento de qué proceso secundario se generó a partir de qué proceso principal. 2. cuando uno de los procesos de trabajo alcanza su condición final, utiliza el ID del proceso principal para encontrar todos los procesos secundarios del proceso principal (incluido él mismo), luego revisa la lista y les indica que finalicen (asegurándose de que no sea así) señalización a sí mismo) El siguiente código contiene la solución de trabajo.

import time
import numpy as np
import multiprocessing as mp
import time
import sys
import os
import psutil
import signal

pid_array = []

def f(i):
    np.random.seed(int(time.time()+i))

    time.sleep(3)
    res=np.random.rand()
    current_process = os.getpid()
    print "From i = ",i, "       res = ",res, " with process ID (pid) = ", current_process
    if res>0.7:
        print "find it"
        # solution: use the parent child connection between processes
        parent = psutil.Process(main_process)
        children = parent.children(recursive=True)
        for process in children:
            if not (process.pid == current_process):
                print "Process: ",current_process,  " killed process: ", process.pid
                process.send_signal(signal.SIGTERM)


if __name__=='__main__':
    num_workers=mp.cpu_count()
    pool=mp.Pool(num_workers)
    main_process = os.getpid()
    print "Main process: ", main_process
    for i in range(num_workers):
        p=mp.Process(target=f,args=(i,))
        p.start()

El resultado da una idea clara de lo que está sucediendo:

Main process:  30249
From i =  0        res =  0.224609517693  with process ID (pid) =  30259
From i =  1        res =  0.470935062176  with process ID (pid) =  30260
From i =  2        res =  0.493680214732  with process ID (pid) =  30261
From i =  3        res =  0.342349294134  with process ID (pid) =  30262
From i =  4        res =  0.149124648092  with process ID (pid) =  30263
From i =  5        res =  0.0134122107375  with process ID (pid) =  30264
From i =  6        res =  0.719062852901  with process ID (pid) =  30265
find it
From i =  7        res =  0.663682945388  with process ID (pid) =  30266
Process:  30265  killed process:  30259
Process:  30265  killed process:  30260
Process:  30265  killed process:  30261
Process:  30265  killed process:  30262
Process:  30265  killed process:  30263
Process:  30265  killed process:  30264
Process:  30265  killed process:  30266

Puntuaciones y comentarios

Si te sientes incitado, tienes la libertad de dejar un tutorial acerca de qué le añadirías a esta reseña.

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *