Saltar al contenido

Romper la función después de cierto tiempo

Luego de de una prolongada compilación de datos dimos con la solución este conflicto que tienen algunos los lectores. Te regalamos la respuesta y esperamos servirte de mucha apoyo.

Solución:

Creo que crear un nuevo proceso puede ser una exageración. Si está en Mac o en un sistema basado en Unix, debería poder usar signal.SIGALRM para forzar el tiempo de espera de las funciones que toman demasiado tiempo. Esto funcionará en funciones que están inactivas para la red u otros problemas que no puede manejar modificando su función. Tengo un ejemplo de cómo usarlo en esta respuesta:

¿Opción para que SSH expire el tiempo de espera después de un breve período de tiempo? ClientAlive y ConnectTimeout no parecen hacer lo que necesito que hagan

Editando mi respuesta aquí, aunque no estoy seguro de que deba hacer eso:

import signal

class TimeoutException(Exception):   # Custom exception class
    pass

def timeout_handler(signum, frame):   # Custom signal handler
    raise TimeoutException

# Change the behavior of SIGALRM
signal.signal(signal.SIGALRM, timeout_handler)

for i in range(3):
    # Start the timer. Once 5 seconds are over, a SIGALRM signal is sent.
    signal.alarm(5)    
    # This try/except loop ensures that 
    #   you'll catch TimeoutException when it's sent.
    try:
        A(i) # Whatever your function that might hang
    except TimeoutException:
        continue # continue the for loop if function A takes more than 5 second
    else:
        # Reset the alarm
        signal.alarm(0)

Básicamente, esto establece un temporizador de 5 segundos y luego intenta ejecutar su código. Si no se completa antes de que acabe el tiempo, se envía un SIGALRM, que capturamos y convertimos en una TimeoutException. Eso te obliga al bloque excepto, donde tu programa puede continuar.

Si puede dividir su trabajo y verificarlo de vez en cuando, esa es casi siempre la mejor solución. Pero a veces eso no es posible; por ejemplo, tal vez esté leyendo un archivo de un recurso compartido de archivos lento que de vez en cuando se cuelga durante 30 segundos. Para lidiar con eso internamente, tendría que reestructurar todo su programa alrededor de un bucle de E / S asíncrono.

Si no necesita ser multiplataforma, puede usar señales en * nix (incluyendo Mac y Linux), APC en Windows, etc. Pero si necesita ser multiplataforma, eso no funciona.

Entonces, si realmente necesita hacerlo al mismo tiempo, puede hacerlo y, a veces, debe hacerlo. En ese caso, probablemente desee utilizar un proceso para esto, no un hilo. Realmente no puede matar un hilo de forma segura, pero puede matar un proceso, y puede ser tan seguro como desee. Además, si el hilo tarda más de 5 segundos porque está vinculado a la CPU, no querrás pelear con él por el GIL.

Aquí hay dos opciones básicas.


Primero, puede poner el código en otro script y ejecutarlo con subprocess:

subprocess.check_call([sys.executable, 'other_script.py', arg, other_arg],
                      timeout=5)

Dado que esto pasa por los canales normales de proceso infantil, la única comunicación que puede utilizar es argv cadenas, un valor de retorno de éxito / fracaso (en realidad, un número entero pequeño, pero eso no es mucho mejor) y, opcionalmente, un trozo de texto entrando y saliendo un trozo de texto.


Alternativamente, puede usar multiprocessing para generar un proceso hijo similar a un hilo:

p = multiprocessing.Process(func, args)
p.start()
p.join(5)
if p.is_alive():
    p.terminate()

Como puede ver, esto es un poco más complicado, pero es mejor de varias maneras:

  • Puede pasar objetos de Python arbitrarios (al menos cualquier cosa que pueda ser decapada) en lugar de solo cadenas.
  • En lugar de tener que poner el código de destino en un script completamente independiente, puede dejarlo como una función en el mismo script.
  • Es más flexible, por ejemplo, si más adelante necesita, digamos, pasar actualizaciones de progreso, es muy fácil agregar una cola en una o ambas direcciones.

El gran problema con cualquier tipo de paralelismo es compartir datos mutables, por ejemplo, tener una tarea en segundo plano que actualice un diccionario global como parte de su trabajo (lo que sus comentarios dicen que está tratando de hacer). Con los hilos, puede salirse con la suya, pero las condiciones de carrera pueden llevar a datos corruptos, por lo que debe tener mucho cuidado con el bloqueo. Con los procesos secundarios, no puede salirse con la suya en absoluto. (Sí, puede usar la memoria compartida, como explica el estado de Compartir entre procesos, pero esto se limita a tipos simples como números, matrices fijas y tipos que sabe cómo definir como estructuras C, y simplemente le devuelve a los mismos problemas como hilos.)


Idealmente, organiza las cosas de modo que no necesite compartir ningún dato mientras se ejecuta el proceso; dict como parámetro y obtener un dict como resultado. Esto suele ser bastante fácil de organizar cuando tiene una función previamente sincrónica que desea poner en segundo plano.

Pero, ¿qué pasa si, digamos, un resultado parcial es mejor que ningún resultado? En ese caso, la solución más simple es pasar los resultados a través de una cola. Puede hacer esto con una cola explícita, como se explica en Intercambio de objetos entre procesos, pero hay una forma más sencilla.

Si puede dividir el proceso monolítico en tareas separadas, una para cada valor (o grupo de valores) que desea incluir en el diccionario, puede programarlas en un Pool—O, mejor aún, un concurrent.futures.Executor. (Si está en Python 2.xo 3.1, consulte el backport futures en PyPI.)

Digamos que su función lenta se ve así:

def spam():
    global d
    for meat in get_all_meats():
        count = get_meat_count(meat)
        d.setdefault(meat, 0) += count

En cambio, harías esto:

def spam_one(meat):
    count = get_meat_count(meat)
    return meat, count

with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
    results = executor.map(spam_one, get_canned_meats(), timeout=5)
    for (meat, count) in results:
        d.setdefault(meat, 0) += count

Todos los resultados que obtenga en 5 segundos se agregarán al dict; si no son todos, el resto se abandonan, y un TimeoutError se genera (que puede manejar como quiera: regístrelo, haga un código de respaldo rápido, lo que sea).

Y si las tareas son realmente independientes (como lo son en mi pequeño ejemplo estúpido, pero por supuesto que pueden no estar en su código real, al menos no sin un rediseño importante), puede paralelizar el trabajo de forma gratuita simplemente eliminando eso. max_workers=1. Luego, si lo ejecuta en una máquina de 8 núcleos, iniciará a 8 trabajadores y les dará cada 1/8 del trabajo por hacer, y las cosas se harán más rápido. (Por lo general, no 8 veces más rápido, pero a menudo entre 3 y 6 veces más rápido, lo que sigue siendo bastante bueno).

Tal vez alguien encuentre útil este decorador, según la respuesta de TheSoundDefense:

import time
import signal

class TimeoutException(Exception):   # Custom exception class
    pass


def break_after(seconds=2):
    def timeout_handler(signum, frame):   # Custom signal handler
        raise TimeoutException
    def function(function):
        def wrapper(*args, **kwargs):
            signal.signal(signal.SIGALRM, timeout_handler)
            signal.alarm(seconds)
            try:
                res = function(*args, **kwargs)
                signal.alarm(0)      # Clear alarm
                return res
            except TimeoutException:
                print u'Oops, timeout: %s sec reached.' % seconds, function.__name__, args, kwargs
            return
        return wrapper
    return function

Prueba:

@break_after(3)
def test(a, b, c):
    return time.sleep(10)

>>> test(1,2,3)
Oops, timeout: 3 sec reached. test (1, 2, 3) 

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *