Saltar al contenido

Programación de Python Script para que se ejecute cada hora con precisión

Posterior a mirar en diversos repositorios y páginas al terminar dimos con la respuesta que te mostramos a continuación.

Solución:

Tal vez esto pueda ayudar: Programador avanzado de Python

Aquí hay un pequeño fragmento de código de su documentación:

from apscheduler.schedulers.blocking import BlockingScheduler

def some_job():
    print "Decorated job"

scheduler = BlockingScheduler()
scheduler.add_job(some_job, 'interval', hours=1)
scheduler.start()

Para ejecutar algo cada 10 minutos después de la hora.

from datetime import datetime, timedelta

while 1:
    print 'Run something..'

    dt = datetime.now() + timedelta(hours=1)
    dt = dt.replace(minute=10)

    while datetime.now() < dt:
        time.sleep(1)

Para apscheduler <3.0, vea la respuesta de Unknown.

Para apscheduler > 3.0

from apscheduler.schedulers.blocking import BlockingScheduler

sched = BlockingScheduler()

@sched.scheduled_job('interval', seconds=10)
def timed_job():
    print('This job is run every 10 seconds.')

@sched.scheduled_job('cron', day_of_week='mon-fri', hour=10)
def scheduled_job():
    print('This job is run every weekday at 10am.')

sched.configure(options_from_ini_file)
sched.start()

Actualizar:

apscheduler documentación.

Esto para apscheduler-3.3.1 en Python 3.6.2.

"""
Following configurations are set for the scheduler:

 - a MongoDBJobStore named “mongo”
 - an SQLAlchemyJobStore named “default” (using SQLite)
 - a ThreadPoolExecutor named “default”, with a worker count of 20
 - a ProcessPoolExecutor named “processpool”, with a worker count of 5
 - UTC as the scheduler’s timezone
 - coalescing turned off for new jobs by default
 - a default maximum instance limit of 3 for new jobs
"""

from pytz import utc
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor

"""
Method 1:
"""
jobstores = 
    'mongo': 'type': 'mongodb',
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')

executors = 
    'default': 'type': 'threadpool', 'max_workers': 20,
    'processpool': ProcessPoolExecutor(max_workers=5)

job_defaults = 
    'coalesce': False,
    'max_instances': 3


"""
Method 2 (ini format):
"""
gconfig = 
    'apscheduler.jobstores.mongo': 
        'type': 'mongodb'
    ,
    'apscheduler.jobstores.default': 
        'type': 'sqlalchemy',
        'url': 'sqlite:///jobs.sqlite'
    ,
    'apscheduler.executors.default': 
        'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
        'max_workers': '20'
    ,
    'apscheduler.executors.processpool': 
        'type': 'processpool',
        'max_workers': '5'
    ,
    'apscheduler.job_defaults.coalesce': 'false',
    'apscheduler.job_defaults.max_instances': '3',
    'apscheduler.timezone': 'UTC',


sched_method1 = BlockingScheduler() # uses overrides from Method1
sched_method2 = BlockingScheduler() # uses same overrides from Method2 but in an ini format


@sched_method1.scheduled_job('interval', seconds=10)
def timed_job():
    print('This job is run every 10 seconds.')


@sched_method2.scheduled_job('cron', day_of_week='mon-fri', hour=10)
def scheduled_job():
    print('This job is run every weekday at 10am.')


sched_method1.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
sched_method1.start()

sched_method2.configure(gconfig=gconfig)
sched_method2.start()

Te mostramos comentarios y calificaciones

Al final de todo puedes encontrar las notas de otros desarrolladores, tú también tienes el poder mostrar el tuyo si te gusta.

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *