Saltar al contenido

Python: ¿cómo ejecutar varias corrutinas al mismo tiempo usando asyncio?

Si te encuentras con alguna parte que no comprendes puedes comentarlo y trataremos de ayudarte lo más rápido posible.

Solución:

TL;DR Utilizar asyncio.ensure_future() para ejecutar varias rutinas al mismo tiempo.


¿Quizás este escenario requiere un marco basado en eventos/devoluciones de llamada en lugar de uno basado en rutinas? ¿Tornado?

No, no necesita ningún otro marco para esto. La idea general de la aplicación asíncrona frente a la síncrona es que no se bloquea, mientras espera el resultado. No importa cómo se implemente, usando corrutinas o devoluciones de llamada.

Quiero decir, debido a que connection_handler está constantemente esperando mensajes entrantes, el servidor solo puede tomar medidas después de haber recibido un mensaje del cliente, ¿verdad? ¿Que me estoy perdiendo aqui?

En la aplicación síncrona escribirás algo como msg = websocket.recv(), que bloquearía toda la aplicación hasta que reciba el mensaje (como lo describió). Pero en la aplicación asíncrona es completamente diferente.

Cuando tu lo hagas msg = yield from websocket.recv() dices algo como: suspender la ejecución de connection_handler() Hasta que websocket.recv() producirá algo. Utilizando yield from dentro de coroutine devuelve el control al bucle de eventos, por lo que se puede ejecutar otro código, mientras esperamos el resultado de websocket.recv(). Consulte la documentación para comprender mejor cómo funcionan las corrutinas.

Digamos que, además, queremos enviar un mensaje al cliente cada vez que ocurre algún evento. Para simplificar, enviemos un mensaje periódicamente cada 60 segundos. ¿Cómo haríamos eso?

Puedes usar asyncio.async() para ejecutar tantas rutinas como desee, antes de ejecutar la llamada de bloqueo para iniciar el ciclo de eventos.

import asyncio

import websockets

# here we'll store all active connections to use for sending periodic messages
connections = []


@asyncio.coroutine
def connection_handler(connection, path):
    connections.append(connection)  # add connection to pool
    while True:
        msg = yield from connection.recv()
        if msg is None:  # connection lost
            connections.remove(connection)  # remove connection from pool, when client disconnects
            break
        else:
            print('< '.format(msg))
        yield from connection.send(msg)
        print('> '.format(msg))


@asyncio.coroutine
def send_periodically():
    while True:
        yield from asyncio.sleep(5)  # switch to other code and continue execution in 5 seconds
        for connection in connections:
            print('> Periodic event happened.')
            yield from connection.send('Periodic event happened.')  # send message to each connected client


start_server = websockets.serve(connection_handler, 'localhost', 8000)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.async(send_periodically())  # before blocking call we schedule our coroutine for sending periodic messages
asyncio.get_event_loop().run_forever()

Aquí hay un ejemplo de implementación del cliente. Le pide que ingrese el nombre, lo recibe del servidor de eco, espera dos mensajes más del servidor (que son nuestros mensajes periódicos) y cierra la conexión.

import asyncio

import websockets


@asyncio.coroutine
def hello():
    connection = yield from websockets.connect('ws://localhost:8000/')
    name = input("What's your name? ")
    yield from connection.send(name)
    print("> ".format(name))
    for _ in range(3):
        msg = yield from connection.recv()
        print("< ".format(msg))

    yield from connection.close()


asyncio.get_event_loop().run_until_complete(hello())

Puntos importantes:

  1. En Python 3.4.4 asyncio.async() fue renombrado a asyncio.ensure_future().
  2. Existen métodos especiales para programar llamadas retrasadas, pero no funcionan con rutinas.

Estoy sorprendido gather no se menciona.

De la documentación de Python:

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task name: Compute factorial(i)...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task name: factorial(number) = f")

async def main():
    # Schedule three calls *concurrently*:
    await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )

asyncio.run(main())

# Expected output:
#
#     Task A: Compute factorial(2)...
#     Task B: Compute factorial(2)...
#     Task C: Compute factorial(2)...
#     Task A: factorial(2) = 2
#     Task B: Compute factorial(3)...
#     Task C: Compute factorial(3)...
#     Task B: factorial(3) = 6
#     Task C: Compute factorial(4)...
#     Task C: factorial(4) = 24

El mismo problema, difícilmente puedo obtener una solución hasta que vi la muestra perfecta aquí: http://websockets.readthedocs.io/en/stable/intro.html#both

 done, pending = await asyncio.wait(
        [listener_task, producer_task],
        return_when=asyncio.FIRST_COMPLETED)  # Important

Por lo tanto, puedo manejar tareas de varias corrutinas, como el latido del corazón y la suscripción a redis.

Valoraciones y reseñas

Nos puedes respaldar nuestro quehacer añadiendo un comentario o dejando una puntuación te damos las gracias.

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *