Este escrito ha sido aprobado por expertos para asegurar la veracidad de este ensayo.
Solución:
Cuando usted llama CompletableFuture#cancel
, solo detienes la parte aguas abajo de la cadena. parte aguas arriba, es decir, algo que eventualmente llamará complete(...)
o completeExceptionally(...)
no recibe ninguna señal de que el resultado ya no es necesario.
¿Qué son esas cosas ‘aguas arriba’ y ‘aguas abajo’?
Consideremos el siguiente código:
CompletableFuture
.supplyAsync(() -> "hello") //1
.thenApply(s -> s + " world!") //2
.thenAccept(s -> System.out.println(s)); //3
Aquí, los datos fluyen de arriba hacia abajo: desde que los crea el proveedor, los modifica la función y los consume println
. La parte de arriba del paso en particular se llama aguas arriba, y la parte de abajo es aguas abajo. P.ej. los pasos 1 y 2 están aguas arriba del paso 3.
Esto es lo que sucede detrás de escena. Esto no es preciso, más bien es un modelo mental conveniente de lo que está pasando.
- Proveedor (paso 1) se está ejecutando (dentro de la JVM común
ForkJoinPool
). - El resultado del proveedor entonces está siendo pasado por
complete(...)
al siguienteCompletableFuture
río abajo. - Al recibir el resultado, que
CompletableFuture
invoca el siguiente paso: una función (paso 2) que toma el resultado del paso anterior y devuelve algo que se pasará más adelante, hacia abajoCompletableFuture
‘scomplete(...)
. - Al recibir el resultado del paso 2, el paso 3
CompletableFuture
invoca al consumidor,System.out.println(s)
. Una vez que el consumidor ha terminado, el downstreamCompletableFuture
recibirá su valor,(Void) null
Como podemos ver, cada CompletableFuture
en esta cadena tiene que saber quiénes están aguas abajo esperando que el valor pase a sus complete(...)
(o completeExceptionally(...)
). Pero el CompletableFuture
no tiene que saber nada acerca de que está aguas arriba (o aguas arriba, puede haber varias).
De este modo, vocación cancel()
en el paso 3 no aborta los pasos 1 y 2porque no hay ningún enlace del paso 3 al paso 2.
Se supone que si estás usando CompletableFuture
entonces sus pasos son lo suficientemente pequeños para que no haya daño si se ejecutan un par de pasos adicionales.
Si desea que la cancelación se propague aguas arriba, tiene dos opciones:
- Implemente esto usted mismo: cree un
CompletableFuture
(nombralo comocancelled
) que se comprueba después de cada paso (algo así comostep.applyToEither(cancelled, Function.identity())
) - Use pilas reactivas como RxJava 2, ProjectReactor/Flux o Akka Streams
Aparentemente, es intencional. El Javadoc para el método CompletableFuture::cancel dice:
[Parameters:] mayInterruptIfRunning: este valor tiene no efecto en esta implementación porque las interrupciones no se utilizan para controlar el procesamiento.
Curiosamente, el método ForkJoinTask::cancel usa casi la misma redacción para el parámetro mayInterruptIfRunning.
Tengo una conjetura sobre este tema:
- interrupción está destinado a ser utilizado con operaciones de bloqueo, como dormir, Espere o operaciones de E/S,
- pero tampoco CompletableFuturo ni ForkJoinTask están destinados a ser utilizados con operaciones de bloqueo.
En lugar de bloquear, un CompletableFuturo deberia crear una nueva Etapa de finalización, y las tareas vinculadas a la CPU son un requisito previo para el modelo de combinación de bifurcación. Entonces, usando interrupción con cualquiera de ellos frustraría su propósito. Y por otro lado, podría aumentar la complejidad, que no es necesario si se usa según lo previsto.
Necesita una implementación alternativa de CompletionStage para lograr true interrupción del hilo. Acabo de lanzar una pequeña biblioteca que sirve exactamente para este propósito: https://github.com/vsilaev/tascalate-concurrent