Saltar al contenido

Cómo pasar resultados entre observables encadenados

Solución:

Sus métodos definitivamente no deben estar acoplados al contexto, así como tampoco pensar en mapear el resultado a la forma específica.

RxJS tiene que ver con la programación funcional. Y en la programación funcional hay un patrón como Adaptación de argumentos a parámetrosárbitro

Nos permite desacoplar la firma de los métodos del contexto.

Para lograr esto, puede escribir la versión de contexto de map, contentMap, mergMap operadores para que la solución final se vea así:

this.startUploadEvent$.pipe(
      map(withKey('event')),
      concatMap_(({event}) => this.getAuthenticationHeaders(event), 'headers'),
      map_(({ headers }) => this.generateUploadId(headers), 'id'),
      tap(({ event, id }) => this.emitUploadStartEvent(id, event)),
      concatMap_(({ id }) => this.createPdfDocument(id), 'pdfId'),
      concatMap_(({ pdfId }) => this.uploadBuilderForPdf(pdfId), 'cloudId'),
      mergeMap_(({ cloudId }) => this.closePdf(cloudId)),
      tap(({id, event, cloudId}) => this.emitUploadDoneEvent(id, event, cloudId)),
    ).subscribe(console.log);

Nota _ después de esos operadores.

Ejemplo de Stackblitz

El objetivo de esos operadores personalizados es tomar el objeto de parámetros, pasar por la función de proyección y agregar el resultado de la proyección al objeto de parámetros original.

function map_<K extends string, P, V>(project: (params: P) => V): OperatorFunction<P, P>;
function map_<K extends string, P, V>(project: (params: P) => V, key: K): OperatorFunction<P, P & Record<K, V>>;
function map_<K extends string, P, V>(project: (params: P) => V, key?: K): OperatorFunction<P, P> {
  return map(gatherParams(project, key));
}

function concatMap_<K extends string, P, V>(projection: (params: P) => Observable<V>): OperatorFunction<P, P>;
function concatMap_<K extends string, P, V>(projection: (params: P) => Observable<V>, key: K): OperatorFunction<P, P & Record<K, V>>;
function concatMap_<K extends string, P, V>(projection: (params: P) => Observable<V>, key?: K): OperatorFunction<P, P> {
  return concatMap(gatherParamsOperator(projection, key));
}

function mergeMap_<K extends string, P, V>(projection: (params: P) => Observable<V>): OperatorFunction<P, P>;
function mergeMap_<K extends string, P, V>(projection: (params: P) => Observable<V>, key: K): OperatorFunction<P, P & Record<K, V>>;
function mergeMap_<K extends string, P, V>(projection: (params: P) => Observable<V>, key?: K): OperatorFunction<P, P> {
  return mergeMap(gatherParamsOperator(projection, key));
}

// https://github.com/Microsoft/TypeScript/wiki/FAQ#why-am-i-getting-supplied-parameters-do-not-match-any-signature-error
function gatherParams<K extends string, P, V>(fn: (params: P) => V): (params: P) => P;
function gatherParams<K extends string, P, V>(fn: (params: P) => V, key: K): (params: P) => P & Record<K, V>;
function gatherParams<K extends string, P, V>(fn: (params: P) => V, key?: K): (params: P) => P {
  return (params: P) => {
    if (typeof key === 'string') {
      return Object.assign({}, params, { [key]: fn(params) } as Record<K, V>);
    }

    return params;
  };
}

function gatherParamsOperator<K extends string, P, V>(fn: (params: P) => Observable<V>): (params: P) => Observable<P>;
function gatherParamsOperator<K extends string, P, V>(fn: (params: P) => Observable<V>, key: K): (params: P) => Observable<P & Record<K, V>>;
function gatherParamsOperator<K extends string, P, V>(fn: (params: P) => Observable<V>, key?: K): (params: P) => Observable<P> {
  return (params: P) => {
    return fn(params).pipe(map(value => gatherParams((_: P) => value, key)(params)));
  };
}

function withKey<K extends string, V>(key: K): (value: V) => Record<K, V> {
  return (value: V) => ({ [key]: value } as Record<K, V>);
}

Usé sobrecargas de funciones aquí porque a veces no necesitamos agregar claves adicionales a los parámetros. Los parámetros solo deben atravesarlo en caso de this.closePdf(...) método.

Como resultado, obtiene una versión desacoplada de la misma que tenía antes con seguridad de tipos:

ingrese la descripción de la imagen aquí

¿No parece una sobre ingeniería?

En la mayoría de los casos debe seguir YAGNI(No lo vas a necesitar) principio. Y sería mejor no agregar más complejidad al código existente. Para tal escenario, debe ceñirse a una implementación simple de compartir parámetros entre operadores de la siguiente manera:

ngOnInit() {
  const params: Partial<Params> = {};
  this.startUploadEvent$.pipe(
    concatMap(event => (params.event = event) && this.getAuthenticationHeaders(event)),
    map(headers => (params.headers = headers) && this.generateUploadId(headers)),
    tap(id => (params.uploadId = id) && this.emitUploadStartEvent(id, event)),
    concatMap(id => this.createPdfDocument(id)),
    concatMap(pdfId => (params.pdfId = pdfId) && this.uploadBuilderForPdf(pdfId)),
    mergeMap(cloudId => (params.cloudId = cloudId) && this.closePdf(cloudId)),
    tap(() => this.emitUploadDoneEvent(params.pdfId, params.cloudId, params.event)),
  ).subscribe(() => {
    console.log(params)
  });

dónde Params el tipo es:

interface Params {
  event: any;
  headers: any;
  uploadId: any;
  pdfId: any;
  cloudId: any;
}

Tenga en cuenta los paréntesis que utilicé en las tareas (params.cloudId = cloudId).

Ejemplo de Stackblitz


También hay muchos otros métodos, pero requieren cambiar su flujo de uso de operadores rxjs:

  • https://medium.com/@snorredanielsen/rxjs-accessing-a-previous-value-further-down-the-pipe-chain-b881026701c1

  • https://medium.com/@snorredanielsen/rxjs-accessing-a-previous-value-further-down-the-pipe-chain-b881026701c1

¡Ciertamente no debería hacer que sus métodos incluyan parámetros que no les conciernen!

A tu pregunta principal:

¿Cómo pasar resultados entre observables encadenados sin los problemas que he mencionado?

Utilice un solo alcance (tuberías anidadas)

El siguiente código es equivalente a su código de muestra, sin necesidad de pasar las propiedades innecesarias. Los valores devueltos anteriormente son accesibles mediante llamadas a funciones más abajo en la cadena:

1   startUploadEvent$.pipe(
2     concatMap(event => getAuthenticationHeaders(event).pipe(
3       map(headers => generateUploadId(event, headers).pipe(
4         tap(id => emitUploadStartEvent(id, event)),
5         concatMap(id => createPdfDocument(event, headers, id)),
6         concatMap(pdfId => uploadBilderForPdf(event, pdfId)),
7         tap(cloudId => closePdf(cloudId, event))
8       ))
9     ))
10  ).subscribe();

Date cuenta cómo event y headers son accesibles aguas abajo. No es necesario pasarlos a funciones que no los requieran.

¿Hay algún concepto de rxjs que me haya perdido?

Quizás.? Realmente no… 🙂

El truco consiste en tachar un .pipe para agrupar efectivamente a los operadores para que todos tengan acceso a los parámetros de entrada.

Por lo general, tratamos de mantener el código plano dentro del .pipe:

1   const greeting$ = userId$.pipe(
2     switchMap(id => http.get(`/users/${id}`)),
3     map(response => response.data.userName),
4     map(name => `Hello ${name}!`),
5     tap(greeting => console.log(greeting))
6   );

pero ese código no es realmente diferente a:

1   const greeting$ = userId$.pipe(
2     switchMap(id => http.get(`/users/${id}`).pipe(
3       map(response => response.data.userName),
4       map(name => `Hello ${name}! (aka User #${id})`)
5     )),
6     tap(greeting => console.log(greeting))
7   );

Pero, en el segundo caso, la línea # 4 tiene acceso al name y el id, mientras que en el primer caso solo tiene acceso a name.

Note que la firma del primero es userId$.pipe(switchMap(), map(), map(), tap())

La segunda es: userId$.pipe(switchMap(), tap()).

Usted puede:

  • asignar el resultado de cada acción a un observable

  • encadenar llamadas a funciones posteriores basadas en resultados anteriores

  • esos resultados se pueden reutilizar en llamadas de acción posteriores a través de withLatestFrom

  • shareReplay se utiliza para prevenir el posterior withLatestFrom suscripciones que hacen que las funciones anteriores se vuelvan a ejecutar

    function startUpload(event$: Observable<string>) {
      const headers$ = event$.pipe(
        concatMap(event => getAuthenticationHeaders(event)),
        shareReplay()
        );
    
      const id$ = headers$.pipe(
        map(() => generateUploadId()),
        shareReplay()
        );
    
      const emitUploadEvent$ = id$.pipe(
        withLatestFrom(event$),   // use earlier result
        map(([id, event]) => emitUploadStartEvent(id, event)),
        shareReplay()
        );
    
       // etc
    }
    

Como se indicó anteriormente, las funciones solo toman los parámetros que requieren y no hay transferencia.

Demostración: https://stackblitz.com/edit/so-rxjs-chaining-1?file=index.ts

Este patrón se puede simplificar mediante el uso de un operador personalizado rxjs (tenga en cuenta que esto podría refinarse aún más, incluida la escritura):

function call<T, R, TArgs extends any[], OArgs extends Observable<any>[]>(
  operator: (func: ((a: TArgs) => R)) => OperatorFunction<TArgs,R>,
  action: (...args: any[]) => R,
  ignoreInput: boolean,
  ...observableArgs: OArgs
): (args: Observable<T>) => Observable<R> {
  return (input: Observable<T>) => input.pipe(
    withLatestFrom(...observableArgs),
    operator((args: any[]) => action(...args.slice(ignoreInput ? 1: 0))),
    shareReplay(1)
  );
}

Que se puede utilizar como:

function startUpload(event$: Observable<string>) {
  const headers$ = event$.pipe(
    call(concatMap, getAuthenticationHeaders, true)
  );

  const id$ = headers$.pipe(
    call(map, generateUploadId, false)
  );

  const startEmitted$ = id$.pipe(
    call(map, emitUploadStartEvent, true, event$)
  );

  const pdfId$ = startEmitted$.pipe(
    call(map, createPdfDocument, false, event$, headers$, id$)
  );

  const uploaded$ = pdfId$.pipe(
    call(map, uploadBuilderForPdf, false, event$, pdfId$, headers$, id$)
  );

  const cloudId$ = uploaded$.pipe(
    call(map, closePdf, false, headers$, pdfId$)
  );

  const uploadDone$ = cloudId$.pipe(
    call(map, emitUploadDoneEvent, true, id$, event$)
  );

  // return cloudId$ instead of uploadDone$ but preserve observable chain
  return uploadDone$.pipe(concatMap(() => cloudId$));    
}

Demostración: https://stackblitz.com/edit/so-rxjs-chaining-4?file=index.ts

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *