Saltar al contenido

¿Cómo escribo una secuencia de promesas en Python?

Este post ha sido probado por nuestros especialistas para que tengas la garantía de la veracidad de esta sección.

Solución:

Aquí hay un programa similar que usa asyncio y el async/await Synthax:

import asyncio
import random

async def alpha(x):
    await asyncio.sleep(0.2)
    return x + 1 

async def bravo(x):
    await asyncio.sleep(0.2)
    return random.randint(0, 1000) + x

async def charlie(x):
    if x % 2 == 0:
        return x
    raise ValueError(x, 'is odd')

async def run():
    try:
        number = await charlie(await bravo(await alpha(42)))
    except ValueError as exc:
        print('error:', exc.args[0])
    else:
        print('success:', number)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())
    loop.close()

EDITAR: Si está interesado en flujos reactivos, podría considerar usar aiostream.

He aquí un ejemplo sencillo:

import asyncio
from aiostream import stream, pipe

async def main():
    # This stream computes 11² + 13² in 1.5 second
    xs = (
        stream.count(interval=0.1)      # Count from zero every 0.1 s
        | pipe.skip(10)                 # Skip the first 10 numbers
        | pipe.take(5)                  # Take the following 5
        | pipe.filter(lambda x: x % 2)  # Keep odd numbers
        | pipe.map(lambda x: x ** 2)    # Square the results
        | pipe.accumulate()             # Add the numbers together
    )
    print('11² + 13² = ', await xs)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()

Más ejemplos en la documentación.

Descargo de responsabilidad: soy el responsable del proyecto.

Estás de suerte, Python 3.4 y superior incluyen asyncio, aunque la función que está buscando (Futuro) está disponible en Python 3.5 y versiones posteriores.

Desde su propio enlace sobre asyncio: “Esta versión solo es relevante para Python 3.3, que no incluye asyncio en su stdlib”.

Ejemplo:

import asyncio


async def some_coroutine():
    await asyncio.sleep(1)
    return 'done'


def process_result(future):
    print('Task returned:', future.result())


loop = asyncio.get_event_loop()
task = loop.create_task(some_coroutine())
task.add_done_callback(process_result)
loop.run_until_complete()

Aquí puedes ver las comentarios y valoraciones de los lectores

Si tienes alguna vacilación y capacidad de mejorar nuestro crónica eres capaz de dejar una explicación y con deseo lo estudiaremos.

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *