Saltar al contenido

¿RxJava `Completable.andThen` no se ejecuta en serie?

La guía paso a paso o código que hallarás en este artículo es la solución más eficiente y válida que encontramos a tus dudas o dilema.

Solución:

el problema no es con andThen pero con la declaración Observable.just(mUser.name) en el interior andThen . Él just el operador intentará crear el observable inmediatamente, aunque lo emitirá solo después Completable.fromAction.

El problema aquí es que, al intentar crear el Observable usando solo , el mUser es null.

Solución : debe diferir la creación de String Observable hasta que se produzca una suscripción, hasta el flujo ascendente de andThen comienza la emisión.

En vez de andThen(Observable.just(mUser.name));

usar

 andThen(Observable.defer(() -> Observable.just(mUser.name)));

O

 andThen(Observable.fromCallable(() -> mUser.name));

No creo que la respuesta de @Sarath Kn sea 100% correcta. Sí just creará observable tan pronto como se llame, pero andThen todavía está llamando just en un momento inesperado.

podemos comparar andThen con flatMap para obtener una mejor comprensión. Aquí hay una prueba completamente ejecutable:

package com.example;

import org.junit.Test;

import io.reactivex.Completable;
import io.reactivex.Observable;
import io.reactivex.observers.TestObserver;
import io.reactivex.schedulers.Schedulers;

public class ExampleTest 

    @Test
    public void createsIntermediateObservable_AfterSubscribing() 
        Observable coldObservable = getObservableSource()
                .flatMap(integer -> getIntermediateObservable())
                .subscribeOn(Schedulers.trampoline())
                .observeOn(Schedulers.trampoline());
        System.out.println("Cold obs created... subscribing");
        TestObserver testObserver = coldObservable.test();
        testObserver.awaitTerminalEvent();

        /*
        Resulting logs:

        Creating observable source
        Cold obs created... subscribing
        Emitting 1,2,3
        Creating intermediate observable
        Creating intermediate observable
        Creating intermediate observable
        Emitting complete notification

        IMPORTANT: see that intermediate observables are created AFTER subscribing
         */
    

    @Test
    public void createsIntermediateObservable_BeforeSubscribing() 
        Observable coldObservable = getCompletableSource()
                .andThen(getIntermediateObservable())
                .subscribeOn(Schedulers.trampoline())
                .observeOn(Schedulers.trampoline());
        System.out.println("Cold obs created... subscribing");
        TestObserver testObserver = coldObservable.test();
        testObserver.awaitTerminalEvent();

        /*
        Resulting logs:

        Creating completable source
        Creating intermediate observable
        Cold obs created... subscribing
        Emitting complete notification

        IMPORTANT: see that intermediate observable is created BEFORE subscribing =(
         */
    

    private Observable getObservableSource() 
        System.out.println("Creating observable source");
        return Observable.create(emitter -> 
            System.out.println("Emitting 1,2,3");
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
            System.out.println("Emitting complete notification");
            emitter.onComplete();
        );
    

    private Observable getIntermediateObservable() 
        System.out.println("Creating intermediate observable");
        return Observable.just("A");
    

    private Completable getCompletableSource() 
        System.out.println("Creating completable source");
        return Completable.create(emitter -> 
            System.out.println("Emitting complete notification");
            emitter.onComplete();
        );
    

Puedes ver que cuando usamos flatmapla just se llama después suscribirse, lo cual tiene sentido. Si el observable intermedio dependiera de los ítems emitidos para la flatmap entonces, por supuesto, el sistema no puede crear el observable intermedio antes de la suscripción. Todavía no tendría ningún valor. Puedes imaginar que esto no funcionaría si flatmap llamado just antes de suscribirse:

.flatMap(integer -> getIntermediateObservable(integer))

lo raro es que andThen es capaz de crear su observable interno (es decir, llamar just) antes de suscribirse. Tiene sentido que puede hacer esto. La única cosa andThen va a recibir es una notificación completa, por lo que no hay razón para NO crear el observable intermedio temprano. El único problema es que no es el esperado comportamiento.

La solución de @Sarath Kn es correcta, pero por el motivo equivocado. si usamos defer podemos ver que las cosas funcionan como se esperaba:

@Test
public void usingDefer_CreatesIntermediateObservable_AfterSubscribing() 
    Observable coldObservable = getCompletableSource()
            .andThen(Observable.defer(this::getIntermediateObservable))
            .subscribeOn(Schedulers.trampoline())
            .observeOn(Schedulers.trampoline());
    System.out.println("Cold obs created... subscribing");
    TestObserver testObserver = coldObservable.test();
    testObserver.awaitTerminalEvent();

    /*
    Resulting logs:

    Creating completable source
    Cold obs created... subscribing
    Emitting complete notification
    Creating intermediate observable

    IMPORTANT: see that intermediate observable is created AFTER subscribing =) YEAY!!
     */

Si estás contento con lo expuesto, tienes la libertad de dejar un ensayo acerca de qué le añadirías a esta noticia.

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *