Te doy la bienvenida a nuestra página, en este sitio encontrarás la solucíon de lo que andabas buscando.
Solución:
Puede lograrlo usando aplazar para rastrear suscripciones y finalizar para rastrear finalizaciones, por ejemplo, como operador:
// a custom operator that will count number of subscribers
function customOperator(onCountUpdate = noop)
return function refCountOperatorFunction(source$)
let counter = 0;
return defer(()=>
counter++;
onCountUpdate(counter);
return source$;
)
.pipe(
finalize(()=>
counter--;
onCountUpdate(counter);
)
);
;
// just a stub for `onCountUpdate`
function noop()
Y luego úsalo como:
const source$ = new Subject();
const result$ = source$.pipe(
customOperator( n => console.log('Count updated: ', n) )
);
Aquí hay un fragmento de código que ilustra esto:
const Subject, of, timer, pipe, defer = rxjs;
const finalize, takeUntil = rxjs.operators;
const source$ = new Subject();
const result$ = source$.pipe(
customOperator( n => console.log('Count updated: ', n) )
);
// emit events
setTimeout(()=>
source$.next('one');
, 250);
setTimeout(()=>
source$.next('two');
, 1000);
setTimeout(()=>
source$.next('three');
, 1250);
setTimeout(()=>
source$.next('four');
, 1750);
// subscribe and unsubscribe
const subscriptionA = result$
.subscribe(value => console.log('A', value));
setTimeout(()=>
result$.subscribe(value => console.log('B', value));
, 500);
setTimeout(()=>
result$.subscribe(value => console.log('C', value));
, 1000);
setTimeout(()=>
subscriptionA.unsubscribe();
, 1500);
// complete source
setTimeout(()=>
source$.complete();
, 2000);
function customOperator(onCountUpdate = noop)
return function refCountOperatorFunction(source$)
let counter = 0;
return defer(()=>
counter++;
onCountUpdate(counter);
return source$;
)
.pipe(
finalize(()=>
counter--;
onCountUpdate(counter);
)
);
;
function noop()
* NOTA: si su fuente $ está fría, es posible que deba compartirla.
Espero eso ayude
Realmente está haciendo tres preguntas distintas aquí, y me pregunto si realmente necesita la capacidad completa que menciona. Dado que la mayor parte de gestión de recursos La biblioteca ya proporciona las cosas que solicita, por lo que hacer un código de seguimiento personalizado parece ser redundante. Las dos primeras preguntas:
- Asignar recursos globales cuando el número de suscripciones sea mayor que 0
- Liberar recurso global cuando el número de suscripciones sea 0
Se puede hacer con el using
+ share
operadores:
class ExpensiveResource
constructor ()
// Do construction
unsubscribe ()
// Do Tear down
// Creates a resource and ties its lifecycle with that of the created `Observable`
// generated by the second factory function
// Using will accept anything that is "Subscription-like" meaning it has a unsubscribe function.
const sharedStream$ = using(
// Creates an expensive resource
() => new ExpensiveResource(),
// Passes that expensive resource to an Observable factory function
er => timer(1000)
)
// Share the underlying source so that global creation and deletion are only
// processed when the subscriber count changes between 0 and 1 (or visa versa)
.pipe(share())
Después sharedStream$
se puede pasar como un flujo base que administrará el recurso subyacente (asumiendo que implementó su unsubscribe
correctamente) para que el recurso se cree y elimine a medida que el número de suscriptores cambia entre 0 y 1.
-
Ajustar la estrategia de uso de recursos según la cantidad de suscripciones
La tercera pregunta sobre la que tengo más dudas, pero la responderé para que esté completa, suponiendo que conozca su aplicación mejor que yo (ya que no puedo pensar en una razón por la que necesitaría un manejo específico en diferentes niveles de uso además de ir entre 0 y 1).
Básicamente, usaría un enfoque similar al anterior, pero encapsularía la lógica de transición de manera ligeramente diferente.
// Same as above
class ExpensiveResource
unsubscribe() console.log('Tear down this resource!')
const usingReferenceTracking =
(onUp, onDown) => (resourceFactory, streamFactory) => r$)
// Use finalize to handle the "onFinish" side-effects
.pipe(finalize(() => onDown(instance, refCount -= 1)))
const referenceTracked$ = usingReferenceTracking(
(ref, count) => console.log('Ref count increased to ' + count),
(ref, count) => console.log('Ref count decreased to ' + count)
)(
() => new ExpensiveResource(),
ref => timer(1000)
)
referenceTracked$.take(1).subscribe(x => console.log('Sub1 ' +x))
referenceTracked$.take(1).subscribe(x => console.log('Sub2 ' +x))
// Ref count increased to 1
// Ref count increased to 2
// Sub1 0
// Ref count decreased to 1
// Sub2 0
// Ref count decreased to 0
// Tear down this resource!
Advertencia: Un efecto secundario de esto es que, por definición, el arroyo estará caliente una vez que salga del usingReferenceTracking
función, y se activará en la primera suscripción. Asegúrese de tener esto en cuenta durante la fase de suscripción.
¡Qué problema tan divertido! Si entiendo lo que está preguntando, aquí está mi solución a esto: cree una clase contenedora alrededor de Observable que rastree las suscripciones interceptando ambos subscribe()
y unsubscribe()
. Aquí está la clase contenedora:
export class CountSubsObservable extends Observable
private _subCount = 0;
private _subCount$: BehaviorSubject = new BehaviorSubject(0);
public subCount$ = this._subCount$.asObservable();
constructor(public source: Observable)
super();
subscribe(
observerOrNext?: PartialObserver
Esta envoltura crea un observable secundario .subCount$
al que se puede suscribir el cual emitirá cada vez que cambie el número de suscripciones a la fuente observable. Emitirá un número correspondiente al número actual de suscriptores.
Para usarlo, crearía una fuente observable y luego llamaría a new con esta clase para crear el contenedor. Por ejemplo:
const source$ = interval(1000).pipe(take(10));
const myEvent$: CountSubsObservable = new CountSubsObservable(source$);
myEvent$.subCount$.subscribe(numSubs =>
console.log('subCount$ notification! Number of subscriptions is now', numSubs);
if(numSubs === 0)
// release global resource
else
// allocate global resource, if not yet allocated
// for a scalable resource usage / load,
// re-configure it, based on numSubs
);
source$.subscribe(result => console.log('result is ', result));
Para verlo en uso, consulte este Stackblitz.
ACTUALIZAR:
De acuerdo, como se mencionó en los comentarios, estoy luchando un poco por entender de dónde proviene el flujo de datos. Mirando hacia atrás a través de su pregunta, veo que está proporcionando una “interfaz de suscripción de eventos”. Si el flujo de datos es un flujo de CustomType
como detalla en su tercera actualización anterior, es posible que desee utilizar fromEvent()
de rxjs
para crear la fuente observable con la que llamaría a la clase contenedora que proporcioné.
Para mostrar esto, creé un nuevo Stackblitz. De ese Stackblitz aquí está la corriente de CustomType
sy cómo usaría la clase CountedObservable para lograr lo que está buscando.
class CustomType
a: string;
const dataArray = [
a: 'January' ,
a: 'February' ,
a: 'March' ,
a: 'April' ,
a: 'May' ,
a: 'June' ,
a: 'July' ,
a: 'August' ,
a: 'September' ,
a: 'October' ,
a: 'November' ,
a: 'December'
] as CustomType[];
// Set up an arbitrary source that sends a stream of `CustomTypes`, one
// every two seconds by using `interval` and mapping the numbers into
// the associated dataArray.
const source$ = interval(2000).pipe(
map(i => dataArray[i]), // transform the Observable stream into CustomTypes
take(dataArray.length), // limit the Observable to only emit # array elements
share() // turn into a hot Observable.
);
const myEvent$: CountedObservable = new CountedObservable(source$);
myEvent$.onCount.subscribe(newCount =>
console.log('newCount notification! Number of subscriptions is now', newCount);
);
Espero que esto ayude.
Si conservas alguna vacilación y disposición de reformar nuestro división eres capaz de ejecutar una disquisición y con mucho gusto lo estudiaremos.