Saltar al contenido

RxJava Observando el subproceso de llamada/suscripción

Hola usuario de nuestra página, hallamos la respuesta a lo que buscabas, desplázate y la hallarás un poco más abajo.

Solución:

Para responder a tu pregunta, déjame empezar desde el principio, esto permite que otras personas entiendan lo que tú ya sabes.

programadores

Los planificadores desempeñan el mismo papel que los ejecutores de Java. Brevemente, deciden qué acciones de hilo se ejecutan.

Por lo general, un Observable y los operadores se ejecutan en el hilo actual. A veces puede pasar Scheduler a Observable u operator como parámetro (por ejemplo, Observable.timer()).

Además, RxJava proporciona 2 operadores para especificar Scheduler:

  • subscribeOn: especifique el Programador en el que operará un Observable
  • observeOn: especifique el Programador en el que un observador observará este Observable

Para entenderlos rápidamente, uso un código de ejemplo:

En todas las muestras, usaré el asistente createObservable, que emite un nombre de hilo en el que opera el Observable:

 public static Observable createObservable()
        return Observable.create((Subscriber subscriber) -> 
                subscriber.onNext(Thread.currentThread().getName());
                subscriber.onCompleted();
            
        );
    

Sin planificadores:

createObservable().subscribe(message -> 
        System.out.println("Case 1 Observable thread " + message);
        System.out.println("Case 1 Observer thread " + Thread.currentThread().getName());
    );
    //will print:
    //Case 1 Observable thread main
    //Case 1 Observer thread main

Suscríbete en:

createObservable()
            .subscribeOn(Schedulers.newThread())
            .subscribe(message -> 
                System.out.println("Case 2 Observable thread " + message);
                System.out.println("Case 2 Observer thread " + Thread.currentThread().getName());
            );
            //will print:
            //Case 2 Observable thread RxNewThreadScheduler-1
            //Case 2 Observer thread RxNewThreadScheduler-1

SubscribeOn y ObserveOn:

reateObservable()
            .subscribeOn(Schedulers.newThread())
            .observeOn(Schedulers.newThread())
            .subscribe(message -> 
                System.out.println("Case 3 Observable thread " + message);
                System.out.println("Case 3 Observer thread " + Thread.currentThread().getName());
            );
            //will print:
            //Case 3 Observable thread RxNewThreadScheduler-2
            //Case 3 Observer thread RxNewThreadScheduler-1

Observar en:

createObservable()
            .observeOn(Schedulers.newThread())
            .subscribe(message -> 
                System.out.println("Case 4 Observable thread " + message);
                System.out.println("Case 4 Observer thread " + Thread.currentThread().getName());
            );
            //will print:
            //Case 4 Observable thread main
            //Case 4 Observer thread RxNewThreadScheduler-1

Responder:

AndroidSchedulers.mainThread() devuelve un programador que delega el trabajo a MessageQueue asociado con el hilo principal.
Para ello utiliza android.os.Looper.getMainLooper() y android.os.Handler.

En otras palabras, si desea especificar un subproceso en particular, debe proporcionar medios para programar y realizar tareas en el subproceso.

Debajo, puede usar cualquier tipo de MQ para almacenar tareas y lógica que hace un bucle en Quee y ejecuta tareas.

En java, tenemos Executor, que está designado para tales tareas. RxJava puede crear fácilmente un Programador desde dicho Ejecutor.

A continuación se muestra un ejemplo que muestra cómo puede observar el hilo principal (no es particularmente útil, pero muestra todas las partes requeridas).

public class RunCurrentThread implements Executor 

    private BlockingQueue tasks = new LinkedBlockingQueue<>();

    public static void main(String[] args) throws InterruptedException 
        RunCurrentThread sample = new RunCurrentThread();
        sample.observerOnMain();
        sample.runLoop();
    

    private void observerOnMain() 
        createObservable()
                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.from(this))
                .subscribe(message -> 
                    System.out.println("Observable thread " + message);
                    System.out.println("Observer thread " + Thread.currentThread().getName());
                );
        ;
    

    public Observable createObservable() 
        return Observable.create((Subscriber subscriber) -> 
                    subscriber.onNext(Thread.currentThread().getName());
                    subscriber.onCompleted();
                
        );
    

    private void runLoop() throws InterruptedException 
        while(!Thread.interrupted())
            tasks.take().run();
        
    

    @Override
    public void execute(Runnable command) 
        tasks.add(command);
    

Y la última pregunta, por qué su código no termina:

ThreadPoolExecutor utiliza hilos que no son demonios por defecto, por lo que su programa no finaliza hasta que existen. Debe usar el método de apagado para cerrar los hilos.

Aquí hay un ejemplo simplificado actualizado para RxJava 2. Es el mismo concepto que la respuesta de Marek: un Ejecutor que agrega los ejecutables a un BlockingQueue que se consume en el subproceso de la persona que llama.

public class ThreadTest 

    @Test
    public void test() throws InterruptedException 

        final BlockingQueue tasks = new LinkedBlockingQueue<>();

        System.out.println("Caller thread: " + Thread.currentThread().getName());

        Observable.fromCallable(new Callable() 
            @Override
            public Integer call() throws Exception 
                System.out.println("Observable thread: " + Thread.currentThread().getName());
                return 1;
            
        )
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.from(new Executor() 
                @Override
                public void execute(@NonNull Runnable runnable) 
                    tasks.add(runnable);
                
            ))
            .subscribe(new Consumer() 
                @Override
                public void accept(@NonNull Integer integer) throws Exception 
                    System.out.println("Observer thread: " + Thread.currentThread().getName());
                
            );
        tasks.take().run();
    



// Output: 
// Caller thread main
// Observable thread RxCachedThreadScheduler-1
// Observer thread main

Al final de todo puedes encontrar las notas de otros usuarios, tú incluso tienes la libertad de dejar el tuyo si dominas el tema.

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *