Saltar al contenido

¿Cómo programar una función para que se ejecute cada hora en Flask?

Solución:

Puedes usar BackgroundScheduler() desde el paquete APScheduler (v3.5.3):

import time
import atexit

from apscheduler.schedulers.background import BackgroundScheduler


def print_date_time():
    print(time.strftime("%A, %d. %B %Y %I:%M:%S %p"))


scheduler = BackgroundScheduler()
scheduler.add_job(func=print_date_time, trigger="interval", seconds=3)
scheduler.start()

# Shut down the scheduler when exiting the app
atexit.register(lambda: scheduler.shutdown())

Tenga en cuenta que dos de estos programadores se iniciarán cuando Flask esté en modo de depuración. Para obtener más información, consulte esta pregunta.

Soy un poco nuevo con el concepto de programadores de aplicaciones, pero lo que encontré aquí para APScheduler v3.3.1 es algo un poco diferente. Creo que para las versiones más recientes, la estructura del paquete, los nombres de las clases, etc., han cambiado, por lo que estoy poniendo aquí una nueva solución que hice recientemente, integrada con una aplicación básica de Flask:

#!/usr/bin/python3
""" Demonstrating Flask, using APScheduler. """

from apscheduler.schedulers.background import BackgroundScheduler
from flask import Flask

def sensor():
    """ Function for test purposes. """
    print("Scheduler is alive!")

sched = BackgroundScheduler(daemon=True)
sched.add_job(sensor,'interval',minutes=60)
sched.start()

app = Flask(__name__)

@app.route("/home")
def home():
    """ Function for test purposes. """
    return "Welcome Home :) !"

if __name__ == "__main__":
    app.run()

También dejo este Gist aquí, si alguien tiene interés en las actualizaciones de este ejemplo.

Aquí hay algunas referencias, para lecturas futuras:

  • Documento de APScheduler: https://apscheduler.readthedocs.io/en/latest/
  • daemon = True: https://docs.python.org/3.4/library/threading.html#thread-objects

Podrías hacer uso de APScheduler en su aplicación Flask y ejecute sus trabajos a través de su interfaz:

import atexit

# v2.x version - see https://stackoverflow.com/a/38501429/135978
# for the 3.x version
from apscheduler.scheduler import Scheduler
from flask import Flask

app = Flask(__name__)

cron = Scheduler(daemon=True)
# Explicitly kick off the background thread
cron.start()

@cron.interval_schedule(hours=1)
def job_function():
    # Do your work here


# Shutdown your cron thread if the web process is stopped
atexit.register(lambda: cron.shutdown(wait=False))

if __name__ == '__main__':
    app.run()
¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *