Saltar al contenido

Spring WebFlux (Flux): cómo publicar dinámicamente

Es imprescindible comprender el código de forma correcta antes de usarlo a tu proyecto y si ttienes algo que aportar puedes dejarlo en los comentarios.

Solución:

Publicar “dinámicamente” usando FluxProcessor y FluxSink

Una de las técnicas para suministrar datos manualmente al Flux esta usando FluxProcessor#sink método como en el siguiente ejemplo

@SpringBootApplication
@RestController
public class DemoApplication 

    final FluxProcessor processor;
    final FluxSink sink;
    final AtomicLong counter;

    public static void main(String[] args) 
        SpringApplication.run(DemoApplication.class, args);

    

    public DemoApplication() 
        this.processor = DirectProcessor.create().serialize();
        this.sink = processor.sink();
        this.counter = new AtomicLong();
    

    @GetMapping("/send")
    public void test() 
        sink.next("Hello World #" + counter.getAndIncrement());
    

    @RequestMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux sse() 
        return processor.map(e -> ServerSentEvent.builder(e).build());
    

Aquí, creé DirectProcessor para admitir múltiples suscriptores, que escucharán el flujo de datos. Además, proporcioné información adicional FluxProcessor#serialize que brindan soporte seguro para multiproductor (invocación desde diferentes subprocesos sin violar las reglas de especificación de Reactive Streams, especialmente la regla 1.3). Finalmente llamando a “http://localhost:8080/send” veremos el mensaje Hello World #1 (por supuesto, solo en caso de que se haya conectado a “http://localhost:8080” anteriormente)

Actualización para reactor 3.4

Con Reactor 3.4 tienes una nueva API llamada reactor.core.publisher.Sinks. Sinks La API ofrece un generador fluido para el envío manual de datos que le permite especificar cosas como la cantidad de elementos en el flujo y el comportamiento de la contrapresión, la cantidad de suscriptores admitidos y las capacidades de reproducción:

@SpringBootApplication
@RestController
public class DemoApplication 

    final Sinks.Many sink;
    final AtomicLong counter;

    public static void main(String[] args) 
        SpringApplication.run(DemoApplication.class, args);

    

    public DemoApplication() 
        this.sink = Sinks.many().multicast().onBackpressureBuffer();
        this.counter = new AtomicLong();
    

    @GetMapping("/send")
    public void test() 
        EmitResult result = sink.tryEmitNext("Hello World #" + counter.getAndIncrement());

        if (result.isFailure()) 
          // do something here, since emission failed
        
    

    @RequestMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux sse() 
        return sink.asFlux().map(e -> ServerSentEvent.builder(e).build());
    

Nota, envío de mensajes a través de Sinks API introduce un nuevo concepto de emission y su resultado. La razón de tal API es el hecho de que el Reactor extiende Reactive-Streams y tiene que seguir el control de contrapresión. dicho eso si tu emit más señales de las solicitadas y la implementación subyacente no admite el almacenamiento en búfer, su mensaje no se entregará. Por lo tanto, el resultado de tryEmitNext devuelve el EmitResult que indica si el mensaje fue enviado o no.

Además, tenga en cuenta que por defecto Sinsk API proporciona una versión serializada de Sink, lo que significa que no tiene que preocuparse por la concurrencia. Sin embargo, si sabe de antemano que la emisión del mensaje es en serie, puede construir un Sinks.unsafe() versión que no serializa mensajes dados

Solo otra idea, usar EmitterProcessor como puerta de entrada al flujo

    import reactor.core.publisher.EmitterProcessor;
    import reactor.core.publisher.Flux;

    public class MyEmitterProcessor 
        EmitterProcessor emitterProcessor;

        public static void main(String args[]) 

            MyEmitterProcessor myEmitterProcessor = new MyEmitterProcessor();
            Flux publisher = myEmitterProcessor.getPublisher();
            myEmitterProcessor.onNext("A");
            myEmitterProcessor.onNext("B");
            myEmitterProcessor.onNext("C");
            myEmitterProcessor.complete();

            publisher.subscribe(x -> System.out.println(x));

        

        public Flux getPublisher() 
            emitterProcessor = EmitterProcessor.create();
            return emitterProcessor.map(x -> "consume: " + x);
         

        public  void onNext(String nextString) 
            emitterProcessor.onNext(nextString);
        

        public  void complete() 
            emitterProcessor.onComplete();
        
    

Más información, ver aquí desde Reactor doc. Hay una recomendación del propio documento de que “la mayoría de las veces, debe intentar evitar el uso de un procesador. Son más difíciles de usar correctamente y propensos a algunos casos de esquina”. PERO no sé qué tipo de caso de esquina.

Recuerda recomendar esta sección si si solucionó tu problema.

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *