Solución:
Sus métodos definitivamente no deben estar acoplados al contexto, así como tampoco pensar en mapear el resultado a la forma específica.
RxJS tiene que ver con la programación funcional. Y en la programación funcional hay un patrón como Adaptación de argumentos a parámetrosárbitro
Nos permite desacoplar la firma de los métodos del contexto.
Para lograr esto, puede escribir la versión de contexto de map
, contentMap
, mergMap
operadores para que la solución final se vea así:
this.startUploadEvent$.pipe(
map(withKey('event')),
concatMap_(({event}) => this.getAuthenticationHeaders(event), 'headers'),
map_(({ headers }) => this.generateUploadId(headers), 'id'),
tap(({ event, id }) => this.emitUploadStartEvent(id, event)),
concatMap_(({ id }) => this.createPdfDocument(id), 'pdfId'),
concatMap_(({ pdfId }) => this.uploadBuilderForPdf(pdfId), 'cloudId'),
mergeMap_(({ cloudId }) => this.closePdf(cloudId)),
tap(({id, event, cloudId}) => this.emitUploadDoneEvent(id, event, cloudId)),
).subscribe(console.log);
Nota _
después de esos operadores.
Ejemplo de Stackblitz
El objetivo de esos operadores personalizados es tomar el objeto de parámetros, pasar por la función de proyección y agregar el resultado de la proyección al objeto de parámetros original.
function map_<K extends string, P, V>(project: (params: P) => V): OperatorFunction<P, P>;
function map_<K extends string, P, V>(project: (params: P) => V, key: K): OperatorFunction<P, P & Record<K, V>>;
function map_<K extends string, P, V>(project: (params: P) => V, key?: K): OperatorFunction<P, P> {
return map(gatherParams(project, key));
}
function concatMap_<K extends string, P, V>(projection: (params: P) => Observable<V>): OperatorFunction<P, P>;
function concatMap_<K extends string, P, V>(projection: (params: P) => Observable<V>, key: K): OperatorFunction<P, P & Record<K, V>>;
function concatMap_<K extends string, P, V>(projection: (params: P) => Observable<V>, key?: K): OperatorFunction<P, P> {
return concatMap(gatherParamsOperator(projection, key));
}
function mergeMap_<K extends string, P, V>(projection: (params: P) => Observable<V>): OperatorFunction<P, P>;
function mergeMap_<K extends string, P, V>(projection: (params: P) => Observable<V>, key: K): OperatorFunction<P, P & Record<K, V>>;
function mergeMap_<K extends string, P, V>(projection: (params: P) => Observable<V>, key?: K): OperatorFunction<P, P> {
return mergeMap(gatherParamsOperator(projection, key));
}
// https://github.com/Microsoft/TypeScript/wiki/FAQ#why-am-i-getting-supplied-parameters-do-not-match-any-signature-error
function gatherParams<K extends string, P, V>(fn: (params: P) => V): (params: P) => P;
function gatherParams<K extends string, P, V>(fn: (params: P) => V, key: K): (params: P) => P & Record<K, V>;
function gatherParams<K extends string, P, V>(fn: (params: P) => V, key?: K): (params: P) => P {
return (params: P) => {
if (typeof key === 'string') {
return Object.assign({}, params, { [key]: fn(params) } as Record<K, V>);
}
return params;
};
}
function gatherParamsOperator<K extends string, P, V>(fn: (params: P) => Observable<V>): (params: P) => Observable<P>;
function gatherParamsOperator<K extends string, P, V>(fn: (params: P) => Observable<V>, key: K): (params: P) => Observable<P & Record<K, V>>;
function gatherParamsOperator<K extends string, P, V>(fn: (params: P) => Observable<V>, key?: K): (params: P) => Observable<P> {
return (params: P) => {
return fn(params).pipe(map(value => gatherParams((_: P) => value, key)(params)));
};
}
function withKey<K extends string, V>(key: K): (value: V) => Record<K, V> {
return (value: V) => ({ [key]: value } as Record<K, V>);
}
Usé sobrecargas de funciones aquí porque a veces no necesitamos agregar claves adicionales a los parámetros. Los parámetros solo deben atravesarlo en caso de this.closePdf(...)
método.
Como resultado, obtiene una versión desacoplada de la misma que tenía antes con seguridad de tipos:
¿No parece una sobre ingeniería?
En la mayoría de los casos debe seguir YAGNI(No lo vas a necesitar) principio. Y sería mejor no agregar más complejidad al código existente. Para tal escenario, debe ceñirse a una implementación simple de compartir parámetros entre operadores de la siguiente manera:
ngOnInit() {
const params: Partial<Params> = {};
this.startUploadEvent$.pipe(
concatMap(event => (params.event = event) && this.getAuthenticationHeaders(event)),
map(headers => (params.headers = headers) && this.generateUploadId(headers)),
tap(id => (params.uploadId = id) && this.emitUploadStartEvent(id, event)),
concatMap(id => this.createPdfDocument(id)),
concatMap(pdfId => (params.pdfId = pdfId) && this.uploadBuilderForPdf(pdfId)),
mergeMap(cloudId => (params.cloudId = cloudId) && this.closePdf(cloudId)),
tap(() => this.emitUploadDoneEvent(params.pdfId, params.cloudId, params.event)),
).subscribe(() => {
console.log(params)
});
dónde Params
el tipo es:
interface Params {
event: any;
headers: any;
uploadId: any;
pdfId: any;
cloudId: any;
}
Tenga en cuenta los paréntesis que utilicé en las tareas (params.cloudId = cloudId)
.
Ejemplo de Stackblitz
También hay muchos otros métodos, pero requieren cambiar su flujo de uso de operadores rxjs:
-
https://medium.com/@snorredanielsen/rxjs-accessing-a-previous-value-further-down-the-pipe-chain-b881026701c1
-
https://medium.com/@snorredanielsen/rxjs-accessing-a-previous-value-further-down-the-pipe-chain-b881026701c1
¡Ciertamente no debería hacer que sus métodos incluyan parámetros que no les conciernen!
A tu pregunta principal:
¿Cómo pasar resultados entre observables encadenados sin los problemas que he mencionado?
Utilice un solo alcance (tuberías anidadas)
El siguiente código es equivalente a su código de muestra, sin necesidad de pasar las propiedades innecesarias. Los valores devueltos anteriormente son accesibles mediante llamadas a funciones más abajo en la cadena:
1 startUploadEvent$.pipe(
2 concatMap(event => getAuthenticationHeaders(event).pipe(
3 map(headers => generateUploadId(event, headers).pipe(
4 tap(id => emitUploadStartEvent(id, event)),
5 concatMap(id => createPdfDocument(event, headers, id)),
6 concatMap(pdfId => uploadBilderForPdf(event, pdfId)),
7 tap(cloudId => closePdf(cloudId, event))
8 ))
9 ))
10 ).subscribe();
Date cuenta cómo event
y headers
son accesibles aguas abajo. No es necesario pasarlos a funciones que no los requieran.
¿Hay algún concepto de rxjs que me haya perdido?
Quizás.? Realmente no… 🙂
El truco consiste en tachar un .pipe
para agrupar efectivamente a los operadores para que todos tengan acceso a los parámetros de entrada.
Por lo general, tratamos de mantener el código plano dentro del .pipe
:
1 const greeting$ = userId$.pipe(
2 switchMap(id => http.get(`/users/${id}`)),
3 map(response => response.data.userName),
4 map(name => `Hello ${name}!`),
5 tap(greeting => console.log(greeting))
6 );
pero ese código no es realmente diferente a:
1 const greeting$ = userId$.pipe(
2 switchMap(id => http.get(`/users/${id}`).pipe(
3 map(response => response.data.userName),
4 map(name => `Hello ${name}! (aka User #${id})`)
5 )),
6 tap(greeting => console.log(greeting))
7 );
Pero, en el segundo caso, la línea # 4 tiene acceso al name
y el id
, mientras que en el primer caso solo tiene acceso a name
.
Note que la firma del primero es userId$.pipe(switchMap(), map(), map(), tap())
La segunda es: userId$.pipe(switchMap(), tap())
.
Usted puede:
-
asignar el resultado de cada acción a un observable
-
encadenar llamadas a funciones posteriores basadas en resultados anteriores
-
esos resultados se pueden reutilizar en llamadas de acción posteriores a través de
withLatestFrom
-
shareReplay
se utiliza para prevenir el posteriorwithLatestFrom
suscripciones que hacen que las funciones anteriores se vuelvan a ejecutarfunction startUpload(event$: Observable<string>) { const headers$ = event$.pipe( concatMap(event => getAuthenticationHeaders(event)), shareReplay() ); const id$ = headers$.pipe( map(() => generateUploadId()), shareReplay() ); const emitUploadEvent$ = id$.pipe( withLatestFrom(event$), // use earlier result map(([id, event]) => emitUploadStartEvent(id, event)), shareReplay() ); // etc }
Como se indicó anteriormente, las funciones solo toman los parámetros que requieren y no hay transferencia.
Demostración: https://stackblitz.com/edit/so-rxjs-chaining-1?file=index.ts
Este patrón se puede simplificar mediante el uso de un operador personalizado rxjs (tenga en cuenta que esto podría refinarse aún más, incluida la escritura):
function call<T, R, TArgs extends any[], OArgs extends Observable<any>[]>(
operator: (func: ((a: TArgs) => R)) => OperatorFunction<TArgs,R>,
action: (...args: any[]) => R,
ignoreInput: boolean,
...observableArgs: OArgs
): (args: Observable<T>) => Observable<R> {
return (input: Observable<T>) => input.pipe(
withLatestFrom(...observableArgs),
operator((args: any[]) => action(...args.slice(ignoreInput ? 1: 0))),
shareReplay(1)
);
}
Que se puede utilizar como:
function startUpload(event$: Observable<string>) {
const headers$ = event$.pipe(
call(concatMap, getAuthenticationHeaders, true)
);
const id$ = headers$.pipe(
call(map, generateUploadId, false)
);
const startEmitted$ = id$.pipe(
call(map, emitUploadStartEvent, true, event$)
);
const pdfId$ = startEmitted$.pipe(
call(map, createPdfDocument, false, event$, headers$, id$)
);
const uploaded$ = pdfId$.pipe(
call(map, uploadBuilderForPdf, false, event$, pdfId$, headers$, id$)
);
const cloudId$ = uploaded$.pipe(
call(map, closePdf, false, headers$, pdfId$)
);
const uploadDone$ = cloudId$.pipe(
call(map, emitUploadDoneEvent, true, id$, event$)
);
// return cloudId$ instead of uploadDone$ but preserve observable chain
return uploadDone$.pipe(concatMap(() => cloudId$));
}
Demostración: https://stackblitz.com/edit/so-rxjs-chaining-4?file=index.ts