Saltar al contenido

¿Cómo enviar un progreso de operación en una aplicación FastAPI?

Nuestro equipo de redactores ha pasado mucho tiempo buscando la respuesta a tu búsqueda, te brindamos la soluciones por eso esperamos resultarte de gran ayuda.

Solución:

Enfoques

Votación

El enfoque más preferido para realizar un seguimiento del progreso de una tarea es el sondeo:

  1. Después de recibir un request para iniciar una tarea en un backend:
    1. Crear un task object en el almacenamiento (por ejemplo, en la memoria, redis y etc.). los task object debe contener los siguientes datos: task ID, status (pendiente, completado), result, y otros.
    2. Ejecute la tarea en segundo plano (corrutinas, subprocesos, multiprocesamiento, cola de tareas como Celery, arq, aio-pika, dramatiq y etc.)
    3. Responde inmediatamente la respuesta 202 (Accepted) devolviendo lo recibido previamente task ID.
  2. Actualizar el estado de la tarea:
    1. Esto puede ser desde la propia tarea, si conoce el almacén de tareas y tiene acceso a él. Periódicamente, la propia tarea actualiza información sobre sí misma.
    2. O use un monitor de tareas (Observer, producer-consumer patrón), que controlará el estado de la tarea y su resultado. Y también actualizará la información en el almacenamiento.
  3. Sobre el client side (front-end) empezar un ciclo de sondeo para el estado de la tarea al punto final /task/ID/status, que toma información del almacenamiento de tareas.

Respuesta de transmisión

La transmisión es una forma menos conveniente de obtener el estado de procesamiento de solicitudes periódicamente. Cuando empujamos gradualmente las respuestas sin cerrar la conexión. Tiene una serie de desventajas importantes, por ejemplo, si la conexión se interrumpe, puede perder información. Streaming Api es otro enfoque que REST Api.

Websockets

También puede utilizar websockets para notificaciones en tiempo real y comunicación bidireccional.

Enlaces:

  • Ejemplos de enfoque de sondeo para la barra de progreso y una descripción más detallada de django + celery se puede encontrar en estos enlaces:

https://www.dangtrinh.com/2013/07/django-celery-display-progress-bar-of.html

https://buildwithdjango.com/blog/post/celery-progress-bars/

  • He proporcionado ejemplos simplificados de ejecución de tareas en segundo plano en FastAPI usando multiprocesamiento aquí:

https://stackoverflow.com/a/63171013/13782669

Respuesta anterior:

Puede ejecutar una tarea en segundo plano, devolver su id y proporcionar un /status punto final que el frente llamaría periódicamente. En la respuesta de estado, puede devolver el estado actual de su tarea (por ejemplo, pendiente con el número del archivo procesado actualmente). Proporcioné algunos ejemplos simples aquí.

Manifestación

Votación

Demostración del enfoque usando tareas asyncio (solución de un solo trabajador):

import asyncio
from http import HTTPStatus
from fastapi import BackgroundTasks
from typing import Dict, List
from uuid import UUID, uuid4
import uvicorn
from fastapi import FastAPI
from pydantic import BaseModel, Field


class Job(BaseModel):
    uid: UUID = Field(default_factory=uuid4)
    status: str = "in_progress"
    progress: int = 0
    result: int = None


app = FastAPI()
jobs: Dict[UUID, Job] =   # Dict as job storage


async def long_task(queue: asyncio.Queue, param: int):
    for i in range(1, param):  # do work and return our progress
        await asyncio.sleep(1)
        await queue.put(i)
    await queue.put(None)


async def start_new_task(uid: UUID, param: int) -> None:

    queue = asyncio.Queue()
    task = asyncio.create_task(long_task(queue, param))

    while progress := await queue.get():  # monitor task progress
        jobs[uid].progress = progress

    jobs[uid].status = "complete"


@app.post("/new_task/param", status_code=HTTPStatus.ACCEPTED)
async def task_handler(background_tasks: BackgroundTasks, param: int):
    new_task = Job()
    jobs[new_task.uid] = new_task
    background_tasks.add_task(start_new_task, new_task.uid, param)
    return new_task


@app.get("/task/uid/status")
async def status_handler(uid: UUID):
    return jobs[uid]

Ejemplo adaptado de bucle de la pregunta

La función de procesamiento en segundo plano se define como def y FastAPI lo ejecuta en el grupo de subprocesos.

import time
from http import HTTPStatus

from fastapi import BackgroundTasks, UploadFile, File
from typing import Dict, List
from uuid import UUID, uuid4
from fastapi import FastAPI
from pydantic import BaseModel, Field


class Job(BaseModel):
    uid: UUID = Field(default_factory=uuid4)
    status: str = "in_progress"
    processed_files: List[str] = Field(default_factory=list)


app = FastAPI()
jobs: Dict[UUID, Job] = 


def process_files(task_id: UUID, files: List[UploadFile]):
    for i in files:
        time.sleep(5)  # pretend long task
        # ...
        # do a lot of operations on each file
        # then append the processed file to a list
        # ...
        jobs[task_id].processed_files.append(i.filename)
    jobs[task_id].status = "completed"


@app.post('/work/test', status_code=HTTPStatus.ACCEPTED)
async def work(background_tasks: BackgroundTasks, files: List[UploadFile] = File(...)):
    new_task = Job()
    jobs[new_task.uid] = new_task
    background_tasks.add_task(process_files, new_task.uid, files)
    return new_task


@app.get("/work/uid/status")
async def status_handler(uid: UUID):
    return jobs[uid]

Transmisión

async def process_files_gen(files: List[UploadFile]):
    for i in files:
        time.sleep(5)  # pretend long task
        # ...
        # do a lot of operations on each file
        # then append the processed file to a list
        # ...
        yield f"i.filename processedn"
    yield f"OKn"


@app.post('/work/stream/test', status_code=HTTPStatus.ACCEPTED)
async def work(files: List[UploadFile] = File(...)):
    return StreamingResponse(process_files_gen(files))

A continuación se muestra una solución que utiliza identificadores uniq y un diccionario disponible a nivel mundial que contiene información sobre los trabajos:

NOTA: El código siguiente es seguro de usar hasta que use dynamic keys valores (en uuid de muestra en uso) y mantenga la aplicación dentro de un solo proceso.

  1. Para iniciar la aplicación, cree un archivo main.py
  2. Correr uvicorn main:app --reload
  3. Cree una entrada de trabajo accediendo http://127.0.0.1:8000/
  4. Repita el paso 3 para crear varios trabajos
  5. Ir a http://127.0.0.1/status página para ver los estados de la página.
  6. Ir a http://127.0.0.1/status/identifier para ver la progresión del trabajo por el ID del trabajo.

Código de aplicación:

from fastapi import FastAPI, UploadFile
import uuid
from typing import List


import asyncio


context = 'jobs': 

app = FastAPI()



async def do_work(job_key, files=None):
    iter_over = files if files else range(100)
    for file, file_number in enumerate(iter_over):
        jobs = context['jobs']
        job_info = jobs[job_key]
        job_info['iteration'] = file_number
        job_info['status'] = 'inprogress'
        await asyncio.sleep(1)
    pending_jobs[job_key]['status'] = 'done'


@app.post('/work/test')
async def testing(files: List[UploadFile]):
    identifier = str(uuid.uuid4())
    context[jobs][identifier] = 
    asyncio.run_coroutine_threadsafe(do_work(identifier, files), loop=asyncio.get_running_loop())

    return "identifier": identifier


@app.get('/')
async def get_testing():
    identifier = str(uuid.uuid4())
    context['jobs'][identifier] = 
    asyncio.run_coroutine_threadsafe(do_work(identifier), loop=asyncio.get_running_loop())

    return "identifier": identifier

@app.get('/status')
def status():
    return 
        'all': list(context['jobs'].values()),
    

@app.get('/status/identifier')
async def status(identifier):
    return 
        "status": context['jobs'].get(identifier, 'job with that identifier is undefined'),
    

Calificaciones y reseñas

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *