Nuestro equipo de redactores ha pasado mucho tiempo buscando la respuesta a tu búsqueda, te brindamos la soluciones por eso esperamos resultarte de gran ayuda.
Solución:
Enfoques
Votación
El enfoque más preferido para realizar un seguimiento del progreso de una tarea es el sondeo:
- Después de recibir un
request
para iniciar una tarea en un backend:- Crear un
task object
en el almacenamiento (por ejemplo, en la memoria,redis
y etc.). lostask object
debe contener los siguientes datos:task ID
,status
(pendiente, completado),result
, y otros. - Ejecute la tarea en segundo plano (corrutinas, subprocesos, multiprocesamiento, cola de tareas como
Celery
,arq
,aio-pika
,dramatiq
y etc.) - Responde inmediatamente la respuesta
202 (Accepted)
devolviendo lo recibido previamentetask ID
.
- Crear un
- Actualizar el estado de la tarea:
- Esto puede ser desde la propia tarea, si conoce el almacén de tareas y tiene acceso a él. Periódicamente, la propia tarea actualiza información sobre sí misma.
- O use un monitor de tareas (
Observer
,producer-consumer
patrón), que controlará el estado de la tarea y su resultado. Y también actualizará la información en el almacenamiento.
- Sobre el
client side
(front-end
) empezar un ciclo de sondeo para el estado de la tarea al punto final/task/ID/status
, que toma información del almacenamiento de tareas.
Respuesta de transmisión
La transmisión es una forma menos conveniente de obtener el estado de procesamiento de solicitudes periódicamente. Cuando empujamos gradualmente las respuestas sin cerrar la conexión. Tiene una serie de desventajas importantes, por ejemplo, si la conexión se interrumpe, puede perder información. Streaming Api es otro enfoque que REST Api.
Websockets
También puede utilizar websockets para notificaciones en tiempo real y comunicación bidireccional.
Enlaces:
- Ejemplos de enfoque de sondeo para la barra de progreso y una descripción más detallada de
django + celery
se puede encontrar en estos enlaces:
https://www.dangtrinh.com/2013/07/django-celery-display-progress-bar-of.html
https://buildwithdjango.com/blog/post/celery-progress-bars/
- He proporcionado ejemplos simplificados de ejecución de tareas en segundo plano en FastAPI usando multiprocesamiento aquí:
https://stackoverflow.com/a/63171013/13782669
Respuesta anterior:
Puede ejecutar una tarea en segundo plano, devolver su id
y proporcionar un /status
punto final que el frente llamaría periódicamente. En la respuesta de estado, puede devolver el estado actual de su tarea (por ejemplo, pendiente con el número del archivo procesado actualmente). Proporcioné algunos ejemplos simples aquí.
Manifestación
Votación
Demostración del enfoque usando tareas asyncio (solución de un solo trabajador):
import asyncio
from http import HTTPStatus
from fastapi import BackgroundTasks
from typing import Dict, List
from uuid import UUID, uuid4
import uvicorn
from fastapi import FastAPI
from pydantic import BaseModel, Field
class Job(BaseModel):
uid: UUID = Field(default_factory=uuid4)
status: str = "in_progress"
progress: int = 0
result: int = None
app = FastAPI()
jobs: Dict[UUID, Job] = # Dict as job storage
async def long_task(queue: asyncio.Queue, param: int):
for i in range(1, param): # do work and return our progress
await asyncio.sleep(1)
await queue.put(i)
await queue.put(None)
async def start_new_task(uid: UUID, param: int) -> None:
queue = asyncio.Queue()
task = asyncio.create_task(long_task(queue, param))
while progress := await queue.get(): # monitor task progress
jobs[uid].progress = progress
jobs[uid].status = "complete"
@app.post("/new_task/param", status_code=HTTPStatus.ACCEPTED)
async def task_handler(background_tasks: BackgroundTasks, param: int):
new_task = Job()
jobs[new_task.uid] = new_task
background_tasks.add_task(start_new_task, new_task.uid, param)
return new_task
@app.get("/task/uid/status")
async def status_handler(uid: UUID):
return jobs[uid]
Ejemplo adaptado de bucle de la pregunta
La función de procesamiento en segundo plano se define como def
y FastAPI lo ejecuta en el grupo de subprocesos.
import time
from http import HTTPStatus
from fastapi import BackgroundTasks, UploadFile, File
from typing import Dict, List
from uuid import UUID, uuid4
from fastapi import FastAPI
from pydantic import BaseModel, Field
class Job(BaseModel):
uid: UUID = Field(default_factory=uuid4)
status: str = "in_progress"
processed_files: List[str] = Field(default_factory=list)
app = FastAPI()
jobs: Dict[UUID, Job] =
def process_files(task_id: UUID, files: List[UploadFile]):
for i in files:
time.sleep(5) # pretend long task
# ...
# do a lot of operations on each file
# then append the processed file to a list
# ...
jobs[task_id].processed_files.append(i.filename)
jobs[task_id].status = "completed"
@app.post('/work/test', status_code=HTTPStatus.ACCEPTED)
async def work(background_tasks: BackgroundTasks, files: List[UploadFile] = File(...)):
new_task = Job()
jobs[new_task.uid] = new_task
background_tasks.add_task(process_files, new_task.uid, files)
return new_task
@app.get("/work/uid/status")
async def status_handler(uid: UUID):
return jobs[uid]
Transmisión
async def process_files_gen(files: List[UploadFile]):
for i in files:
time.sleep(5) # pretend long task
# ...
# do a lot of operations on each file
# then append the processed file to a list
# ...
yield f"i.filename processedn"
yield f"OKn"
@app.post('/work/stream/test', status_code=HTTPStatus.ACCEPTED)
async def work(files: List[UploadFile] = File(...)):
return StreamingResponse(process_files_gen(files))
A continuación se muestra una solución que utiliza identificadores uniq y un diccionario disponible a nivel mundial que contiene información sobre los trabajos:
NOTA: El código siguiente es seguro de usar hasta que use dynamic keys valores (en uuid de muestra en uso) y mantenga la aplicación dentro de un solo proceso.
- Para iniciar la aplicación, cree un archivo
main.py
- Correr
uvicorn main:app --reload
- Cree una entrada de trabajo accediendo
http://127.0.0.1:8000/
- Repita el paso 3 para crear varios trabajos
- Ir a
http://127.0.0.1/status
página para ver los estados de la página. - Ir a
http://127.0.0.1/status/identifier
para ver la progresión del trabajo por el ID del trabajo.
Código de aplicación:
from fastapi import FastAPI, UploadFile
import uuid
from typing import List
import asyncio
context = 'jobs':
app = FastAPI()
async def do_work(job_key, files=None):
iter_over = files if files else range(100)
for file, file_number in enumerate(iter_over):
jobs = context['jobs']
job_info = jobs[job_key]
job_info['iteration'] = file_number
job_info['status'] = 'inprogress'
await asyncio.sleep(1)
pending_jobs[job_key]['status'] = 'done'
@app.post('/work/test')
async def testing(files: List[UploadFile]):
identifier = str(uuid.uuid4())
context[jobs][identifier] =
asyncio.run_coroutine_threadsafe(do_work(identifier, files), loop=asyncio.get_running_loop())
return "identifier": identifier
@app.get('/')
async def get_testing():
identifier = str(uuid.uuid4())
context['jobs'][identifier] =
asyncio.run_coroutine_threadsafe(do_work(identifier), loop=asyncio.get_running_loop())
return "identifier": identifier
@app.get('/status')
def status():
return
'all': list(context['jobs'].values()),
@app.get('/status/identifier')
async def status(identifier):
return
"status": context['jobs'].get(identifier, 'job with that identifier is undefined'),