Saltar al contenido

¿Cómo monitorear el número de suscripciones RXJS?

Te doy la bienvenida a nuestra página, en este sitio encontrarás la solucíon de lo que andabas buscando.

Solución:

Puede lograrlo usando aplazar para rastrear suscripciones y finalizar para rastrear finalizaciones, por ejemplo, como operador:

// a custom operator that will count number of subscribers
function customOperator(onCountUpdate = noop) 
  return function refCountOperatorFunction(source$) 
    let counter = 0;

    return defer(()=>
      counter++;
      onCountUpdate(counter);
      return source$;
    )
    .pipe(
      finalize(()=>
        counter--;
        onCountUpdate(counter);
      )
    );
  ;


// just a stub for `onCountUpdate`
function noop()

Y luego úsalo como:

const source$ = new Subject();

const result$ = source$.pipe(
  customOperator( n => console.log('Count updated: ', n) )
);

Aquí hay un fragmento de código que ilustra esto:

const  Subject, of, timer, pipe, defer  = rxjs;
const  finalize, takeUntil  = rxjs.operators;


const source$ = new Subject();

const result$ = source$.pipe(
  customOperator( n => console.log('Count updated: ', n) )
);

// emit events
setTimeout(()=>
  source$.next('one');
, 250);

setTimeout(()=>
  source$.next('two');
, 1000);

setTimeout(()=>
  source$.next('three');
, 1250);

setTimeout(()=>
  source$.next('four');
, 1750);


// subscribe and unsubscribe
const subscriptionA = result$
  .subscribe(value => console.log('A', value));

setTimeout(()=>
  result$.subscribe(value => console.log('B', value));
, 500);


setTimeout(()=>
  result$.subscribe(value => console.log('C', value));
, 1000);

setTimeout(()=>
  subscriptionA.unsubscribe();
, 1500);


// complete source
setTimeout(()=>
  source$.complete();
, 2000);


function customOperator(onCountUpdate = noop) 
  return function refCountOperatorFunction(source$) 
    let counter = 0;

    return defer(()=>
      counter++;
      onCountUpdate(counter);
      return source$;
    )
    .pipe(
      finalize(()=>
        counter--;
        onCountUpdate(counter);
      )
    );
  ;


function noop()

* NOTA: si su fuente $ está fría, es posible que deba compartirla.

Espero eso ayude

Realmente está haciendo tres preguntas distintas aquí, y me pregunto si realmente necesita la capacidad completa que menciona. Dado que la mayor parte de gestión de recursos La biblioteca ya proporciona las cosas que solicita, por lo que hacer un código de seguimiento personalizado parece ser redundante. Las dos primeras preguntas:

  • Asignar recursos globales cuando el número de suscripciones sea mayor que 0
  • Liberar recurso global cuando el número de suscripciones sea 0

Se puede hacer con el using + share operadores:

class ExpensiveResource 
  constructor () 
    // Do construction
  
  unsubscribe () 
   // Do Tear down
  


// Creates a resource and ties its lifecycle with that of the created `Observable`
// generated by the second factory function
// Using will accept anything that is "Subscription-like" meaning it has a unsubscribe function.
const sharedStream$ = using(
  // Creates an expensive resource
  () => new ExpensiveResource(), 
  // Passes that expensive resource to an Observable factory function
  er => timer(1000)
)
// Share the underlying source so that global creation and deletion are only
// processed when the subscriber count changes between 0 and 1 (or visa versa)
.pipe(share())

Después sharedStream$ se puede pasar como un flujo base que administrará el recurso subyacente (asumiendo que implementó su unsubscribe correctamente) para que el recurso se cree y elimine a medida que el número de suscriptores cambia entre 0 y 1.

  • Ajustar la estrategia de uso de recursos según la cantidad de suscripciones

    La tercera pregunta sobre la que tengo más dudas, pero la responderé para que esté completa, suponiendo que conozca su aplicación mejor que yo (ya que no puedo pensar en una razón por la que necesitaría un manejo específico en diferentes niveles de uso además de ir entre 0 y 1).

Básicamente, usaría un enfoque similar al anterior, pero encapsularía la lógica de transición de manera ligeramente diferente.

// Same as above
class ExpensiveResource 
  unsubscribe()   console.log('Tear down this resource!')


const usingReferenceTracking = 
  (onUp, onDown) => (resourceFactory, streamFactory) =>  r$)
      // Use finalize to handle the "onFinish" side-effects
      .pipe(finalize(() => onDown(instance, refCount -= 1)))



const referenceTracked$ = usingReferenceTracking(
  (ref, count) => console.log('Ref count increased to ' + count),
  (ref, count) => console.log('Ref count decreased to ' + count)
)(
  () => new ExpensiveResource(),
  ref => timer(1000)
)

referenceTracked$.take(1).subscribe(x => console.log('Sub1 ' +x))
referenceTracked$.take(1).subscribe(x => console.log('Sub2 ' +x))


// Ref count increased to 1
// Ref count increased to 2
// Sub1 0
// Ref count decreased to 1
// Sub2 0
// Ref count decreased to 0
// Tear down this resource!

Advertencia: Un efecto secundario de esto es que, por definición, el arroyo estará caliente una vez que salga del usingReferenceTracking función, y se activará en la primera suscripción. Asegúrese de tener esto en cuenta durante la fase de suscripción.

¡Qué problema tan divertido! Si entiendo lo que está preguntando, aquí está mi solución a esto: cree una clase contenedora alrededor de Observable que rastree las suscripciones interceptando ambos subscribe() y unsubscribe(). Aquí está la clase contenedora:

export class CountSubsObservable extends Observable
  private _subCount = 0;
  private _subCount$: BehaviorSubject = new BehaviorSubject(0);
  public subCount$ = this._subCount$.asObservable();

  constructor(public source: Observable) 
    super();
  

  subscribe(
    observerOrNext?: PartialObserver 

Esta envoltura crea un observable secundario .subCount$ al que se puede suscribir el cual emitirá cada vez que cambie el número de suscripciones a la fuente observable. Emitirá un número correspondiente al número actual de suscriptores.

Para usarlo, crearía una fuente observable y luego llamaría a new con esta clase para crear el contenedor. Por ejemplo:

const source$ = interval(1000).pipe(take(10));

const myEvent$: CountSubsObservable = new CountSubsObservable(source$);

myEvent$.subCount$.subscribe(numSubs => 
  console.log('subCount$ notification! Number of subscriptions is now', numSubs);
  if(numSubs === 0) 
    // release global resource
   else 
    // allocate global resource, if not yet allocated
  
  // for a scalable resource usage / load,
  // re-configure it, based on numSubs
);

source$.subscribe(result => console.log('result is ', result));

Para verlo en uso, consulte este Stackblitz.

ACTUALIZAR:

De acuerdo, como se mencionó en los comentarios, estoy luchando un poco por entender de dónde proviene el flujo de datos. Mirando hacia atrás a través de su pregunta, veo que está proporcionando una “interfaz de suscripción de eventos”. Si el flujo de datos es un flujo de CustomType como detalla en su tercera actualización anterior, es posible que desee utilizar fromEvent() de rxjs para crear la fuente observable con la que llamaría a la clase contenedora que proporcioné.

Para mostrar esto, creé un nuevo Stackblitz. De ese Stackblitz aquí está la corriente de CustomTypesy cómo usaría la clase CountedObservable para lograr lo que está buscando.

class CustomType 
  a: string;


const dataArray = [
   a: 'January' ,
   a: 'February' ,
   a: 'March' ,
   a: 'April' ,
   a: 'May' ,
   a: 'June' ,
   a: 'July' ,
   a: 'August' ,
   a: 'September' ,
   a: 'October' ,
   a: 'November' ,
   a: 'December' 
] as CustomType[];

// Set up an arbitrary source that sends a stream of `CustomTypes`, one
// every two seconds by using `interval` and mapping the numbers into
// the associated dataArray.  
const source$ = interval(2000).pipe(
  map(i => dataArray[i]), // transform the Observable stream into CustomTypes
  take(dataArray.length),  // limit the Observable to only emit # array elements
  share() // turn into a hot Observable.
);

const myEvent$: CountedObservable = new CountedObservable(source$);

myEvent$.onCount.subscribe(newCount => 
  console.log('newCount notification! Number of subscriptions is now', newCount);
);

Espero que esto ayude.

Si conservas alguna vacilación y disposición de reformar nuestro división eres capaz de ejecutar una disquisición y con mucho gusto lo estudiaremos.

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *