Presta atención ya que en este enunciado encontrarás la solución que buscas.
Solución:
from functools import wraps
from celery import shared_task
def skip_if_running(f):
task_name = f'f.__module__.f.__name__'
@wraps(f)
def wrapped(self, *args, **kwargs):
workers = self.app.control.inspect().active()
for worker, tasks in workers.items():
for task in tasks:
if (task_name == task['name'] and
tuple(args) == tuple(task['args']) and
kwargs == task['kwargs'] and
self.request.id != task['id']):
print(f'task task_name (args, kwargs) is running on worker, skipping')
return None
return f(self, *args, **kwargs)
return wrapped
@shared_task(bind=True)
@skip_if_running
def test_single_task(self):
pass
test_single_task.delay()
La única forma de hacerlo es implementar una estrategia de bloqueo usted mismo:
Lea debajo de la sección aquí para la referencia.
Al igual que con cron, las tareas pueden superponerse si la primera tarea no se completa antes de la siguiente. Si eso le preocupa, debe usar una estrategia de bloqueo para garantizar que solo se pueda ejecutar una instancia a la vez (consulte, por ejemplo, Garantizar que una tarea solo se ejecute una a la vez).
Resolví el problema usando apio-una vez que extendí a apio-uno.
Ambos sirven para tu problema. Utiliza Redis para bloquear una tarea en ejecución. celery-one
también realizará un seguimiento de la tarea que se está bloqueando.
A continuación se muestra un ejemplo de uso muy simple para celery beat. En el código de abajo, slow_task
está programado cada 1 segundo, pero su tiempo de finalización es de 5 segundos. El apio normal programaría la tarea cada segundo incluso si ya se está ejecutando. celery-one
evitaría esto.
celery = Celery('test')
celery.conf.ONE_REDIS_URL = REDIS_URL
celery.conf.ONE_DEFAULT_TIMEOUT = 60 * 60
celery.conf.BROKER_URL = REDIS_URL
celery.conf.CELERY_RESULT_BACKEND = REDIS_URL
from datetime import timedelta
celery.conf.CELERYBEAT_SCHEDULE =
'add-every-30-seconds':
'task': 'tasks.slow_task',
'schedule': timedelta(seconds=1),
'args': (1,)
,
celery.conf.CELERY_TIMEZONE = 'UTC'
@celery.task(base=QueueOne, one_options='fail': False)
def slow_task(a):
print("Running")
sleep(5)
return "Done " + str(a)
Puntuaciones y reseñas
Eres capaz de añadir valor a nuestro contenido colaborando tu veteranía en las anotaciones.