Saltar al contenido

Celery Beat: limite a una sola instancia de tarea a la vez

Presta atención ya que en este enunciado encontrarás la solución que buscas.

Solución:

from functools import wraps
from celery import shared_task


def skip_if_running(f):
    task_name = f'f.__module__.f.__name__'

    @wraps(f)
    def wrapped(self, *args, **kwargs):
        workers = self.app.control.inspect().active()

        for worker, tasks in workers.items():
            for task in tasks:
                if (task_name == task['name'] and
                        tuple(args) == tuple(task['args']) and
                        kwargs == task['kwargs'] and
                        self.request.id != task['id']):
                    print(f'task task_name (args, kwargs) is running on worker, skipping')

                    return None

        return f(self, *args, **kwargs)

    return wrapped


@shared_task(bind=True)
@skip_if_running
def test_single_task(self):
    pass


test_single_task.delay()

La única forma de hacerlo es implementar una estrategia de bloqueo usted mismo:

Lea debajo de la sección aquí para la referencia.

Al igual que con cron, las tareas pueden superponerse si la primera tarea no se completa antes de la siguiente. Si eso le preocupa, debe usar una estrategia de bloqueo para garantizar que solo se pueda ejecutar una instancia a la vez (consulte, por ejemplo, Garantizar que una tarea solo se ejecute una a la vez).

Resolví el problema usando apio-una vez que extendí a apio-uno.

Ambos sirven para tu problema. Utiliza Redis para bloquear una tarea en ejecución. celery-one también realizará un seguimiento de la tarea que se está bloqueando.

A continuación se muestra un ejemplo de uso muy simple para celery beat. En el código de abajo, slow_task está programado cada 1 segundo, pero su tiempo de finalización es de 5 segundos. El apio normal programaría la tarea cada segundo incluso si ya se está ejecutando. celery-one evitaría esto.

celery = Celery('test')
celery.conf.ONE_REDIS_URL = REDIS_URL
celery.conf.ONE_DEFAULT_TIMEOUT = 60 * 60
celery.conf.BROKER_URL = REDIS_URL
celery.conf.CELERY_RESULT_BACKEND = REDIS_URL

from datetime import timedelta

celery.conf.CELERYBEAT_SCHEDULE = 
    'add-every-30-seconds': 
        'task': 'tasks.slow_task',
        'schedule': timedelta(seconds=1),
        'args': (1,)
    ,


celery.conf.CELERY_TIMEZONE = 'UTC'


@celery.task(base=QueueOne, one_options='fail': False)
def slow_task(a):
    print("Running")
    sleep(5)
    return "Done " + str(a)

Puntuaciones y reseñas

Eres capaz de añadir valor a nuestro contenido colaborando tu veteranía en las anotaciones.

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *