Después de de una prolongada selección de información pudimos resolver esta duda que tienen algunos usuarios. Te dejamos la respuesta y deseamos serte de mucha apoyo.
Solución:
Este es un problema interesante. Jugué con varias combinaciones de Timer.publish
, buffer
, zip
, y throttle
, pero no pude conseguir que ninguna combinación funcionara de la forma deseada. Así que escribamos un suscriptor personalizado.
Lo que realmente nos gustaría es una API donde, cuando obtenemos una entrada desde el flujo ascendente, también tenemos la capacidad de controlar cuándo el flujo ascendente entrega la siguiente entrada. Algo como esto:
extension Publisher
/// Subscribe to me with a stepping function.
/// - parameter stepper: A function I'll call with each of my inputs, and with my completion.
/// Each time I call this function with an input, I also give it a promise function.
/// I won't deliver the next input until the promise is called with a `.more` argument.
/// - returns: An object you can use to cancel the subscription asynchronously.
func step(with stepper: @escaping (StepEvent
Con este step
API, podemos escribir una pace
operador que hace lo que quieres:
extension Publisher
func pace(
_ pace: Context.SchedulerTimeType.Stride, scheduler: Context, subject: MySubject)
-> AnyCancellable
where MySubject.Output == Output, MySubject.Failure == Failure
return step
switch $0
case .input(let input, let promise):
// Send the input from upstream now.
subject.send(input)
// Wait for the pace interval to elapse before requesting the
// next input from upstream.
scheduler.schedule(after: scheduler.now.advanced(by: pace))
promise(.more)
case .completion(let completion):
subject.send(completion: completion)
Esta pace
operador toma pace
(el intervalo requerido entre salidas), un programador en el que programar eventos y un subject
en el que volver a publicar las entradas de upstream. Maneja cada entrada enviándola a través de subject
y luego usar el programador para esperar el intervalo de ritmo antes de solicitar la siguiente entrada desde el flujo ascendente.
Ahora solo tenemos que implementar el step
operador. Combine no nos ayuda demasiado aquí. Tiene una función llamada “contrapresión”, lo que significa que un editor no puede enviar una entrada en sentido descendente hasta que el descendente lo solicite enviando un Subscribers.Demand
río arriba. Por lo general, ves que las aguas abajo envían un .unlimited
demanda aguas arriba, pero no vamos a hacerlo. En cambio, vamos a aprovechar la contrapresión. No enviaremos ninguna demanda en sentido ascendente hasta que el paso a paso complete una promesa, y luego solo enviaremos una demanda de .max(1)
, por lo que hacemos que el flujo ascendente funcione en sincronía con el paso a paso. (También tenemos que enviar una demanda inicial de .max(1)
para iniciar todo el proceso.)
De acuerdo, entonces necesito implementar un tipo que tome una función paso a paso y se ajuste a Subscriber
. Es una buena idea revisar la Especificación de JVM de Flujos reactivos, porque Combine se basa en esa especificación.
Lo que dificulta la implementación es que varias cosas pueden llamar a nuestro suscriptor de forma asincrónica:
- El upstream puede llamar al suscriptor desde cualquier hilo (pero es necesario para serializar sus llamadas).
- Una vez que le hemos dado funciones de promesa al paso a paso, el paso a paso puede llamar a esas promesas en cualquier hilo.
- Queremos que la suscripción sea cancelable, y esa cancelación puede ocurrir en cualquier hilo.
- Toda esta asincronicidad significa que tenemos que proteger nuestro estado interno con un candado.
- Tenemos que tener cuidado de no llamar mientras sujetamos ese candado, para evitar un punto muerto.
También protegeremos al suscriptor de las travesuras que implican llamar a una promesa repetidamente, o llamar a promesas obsoletas, dándole a cada promesa una identificación única.
Aquí está nuestra definición básica de suscriptor:
import Combine
import Foundation
public class SteppingSubscriber
public init(stepper: @escaping Stepper)
l_state = .subscribing(stepper)
public typealias Stepper = (Event) -> ()
public enum Event
case input(Input, Promise)
case completion(Completion)
public typealias Promise = (Request) -> ()
public enum Request
case more
case cancel
public typealias Completion = Subscribers.Completion
private let lock = NSLock()
// The l_ prefix means it must only be accessed while holding the lock.
private var l_state: State
private var l_nextPromiseId: PromiseId = 1
private typealias PromiseId = Int
private var noPromiseId: PromiseId 0
Observe que moví los tipos auxiliares de antes (StepEvent
, StepPromise
, y StepPromiseRequest
) dentro SteppingSubscriber
y acortó sus nombres.
Ahora consideremos l_state
tipo misterioso, State
. ¿Cuáles son los diferentes estados en los que podría estar nuestro suscriptor?
- Podríamos estar esperando recibir el
Subscription
objeto de aguas arriba. - Podríamos haber recibido el
Subscription
desde aguas arriba y estar esperando una señal (una entrada o finalización desde aguas arriba, o la finalización de una promesa del paso a paso). - Podríamos estar llamando al paso a paso, que queremos tener cuidado en caso de que cumpla una promesa mientras lo llamamos.
- Podríamos haber sido cancelados o haber recibido la finalización de upstream.
Así que aquí está nuestra definición de State
:
extension SteppingSubscriber
private enum State
// Completed or cancelled.
case dead
// Waiting for Subscription from upstream.
case subscribing(Stepper)
// Waiting for a signal from upstream or for the latest promise to be completed.
case subscribed(Subscribed)
// Calling out to the stopper.
case stepping(Stepping)
var subscription: Subscription?
switch self
case .dead: return nil
case .subscribing(_): return nil
case .subscribed(let subscribed): return subscribed.subscription
case .stepping(let stepping): return stepping.subscribed.subscription
struct Subscribed
var stepper: Stepper
var subscription: Subscription
var validPromiseId: PromiseId
struct Stepping
var subscribed: Subscribed
// If the stepper completes the current promise synchronously with .more,
// I set this to true.
var shouldRequestMore: Bool
Ya que estamos usando NSLock
(para simplificar), definamos una extensión para asegurarnos de que siempre hacemos coincidir el bloqueo con el desbloqueo:
fileprivate extension NSLock
@inline(__always)
func sync(_ body: () -> Answer) -> Answer
lock()
defer unlock()
return body()
Ahora estamos listos para manejar algunos eventos. El evento más fácil de manejar es la cancelación asincrónica, que es la Cancellable
único requisito del protocolo. Si estamos en cualquier estado excepto .dead
, queremos convertirnos .dead
y, si hay una suscripción ascendente, cancélela.
extension SteppingSubscriber: Cancellable
public func cancel()
let sub: Subscription? = lock.sync
defer l_state = .dead
return l_state.subscription
sub?.cancel()
Observe aquí que no quiero llamar a la suscripción de upstream cancel
funcionar mientras lock
está bloqueado, porque lock
no es un bloqueo recursivo y no quiero arriesgarme a un punto muerto. Todo uso de lock.sync
sigue el patrón de aplazar cualquier llamada hasta que se desbloquee la cerradura.
Ahora implementemos el Subscriber
requisitos de protocolo. Primero, manejemos la recepción del Subscription
desde aguas arriba. La única vez que esto debería suceder es cuando estamos en el .subscribing
estado, pero .dead
También es posible, en cuyo caso solo queremos cancelar la suscripción upstream.
extension SteppingSubscriber: Subscriber
public func receive(subscription: Subscription)
let action: () -> () = lock.sync
guard case .subscribing(let stepper) = l_state else
return subscription.cancel()
l_state = .subscribed(.init(stepper: stepper, subscription: subscription, validPromiseId: noPromiseId))
return subscription.request(.max(1))
action()
Tenga en cuenta que en este uso de lock.sync
(y en todos los usos posteriores), devuelvo un cierre de “acción” para poder realizar llamadas arbitrarias después de que se haya desbloqueado el bloqueo.
El siguiente Subscriber
El requisito de protocolo que abordaremos es recibir una finalización:
public func receive(completion: Subscribers.Completion)
let action: (() -> ())? = lock.sync
// The only state in which I have to handle this call is .subscribed:
// - If I'm .dead, either upstream already completed (and shouldn't call this again),
// or I've been cancelled.
// - If I'm .subscribing, upstream must send me a Subscription before sending me a completion.
// - If I'm .stepping, upstream is currently signalling me and isn't allowed to signal
// me again concurrently.
guard case .subscribed(let subscribed) = l_state else
return nil
l_state = .dead
return [stepper = subscribed.stepper] in
stepper(.completion(completion))
action?()
El mas complejo Subscriber
El requisito de protocolo para nosotros es recibir un Input
:
- Tenemos que crear una promesa.
- Tenemos que pasar la promesa al paso a paso.
- El paso a paso podría completar la promesa antes de regresar.
- Después de que regrese el stepper, tenemos que verificar si completó la promesa con
.more
y, de ser así, devolver la demanda adecuada en sentido ascendente.
Como tenemos que llamar al paso a paso en medio de este trabajo, tenemos algunos desagradables anidamientos de lock.sync
llamadas.
public func receive(_ input: Input) -> Subscribers.Demand
let action: (() -> Subscribers.Demand)? = lock.sync
// The only state in which I have to handle this call is .subscribed:
// - If I'm .dead, either upstream completed and shouldn't call this,
// or I've been cancelled.
// - If I'm .subscribing, upstream must send me a Subscription before sending me Input.
// - If I'm .stepping, upstream is currently signalling me and isn't allowed to
// signal me again concurrently.
guard case .subscribed(var subscribed) = l_state else
return nil
let promiseId = l_nextPromiseId
l_nextPromiseId += 1
let promise: Promise = request in
self.completePromise(id: promiseId, request: request)
subscribed.validPromiseId = promiseId
l_state = .stepping(.init(subscribed: subscribed, shouldRequestMore: false))
return [stepper = subscribed.stepper] in
stepper(.input(input, promise))
let demand: Subscribers.Demand = self.lock.sync
// The only possible states now are .stepping and .dead.
guard case .stepping(let stepping) = self.l_state else
return .none
self.l_state = .subscribed(stepping.subscribed)
return stepping.shouldRequestMore ? .max(1) : .none
return demand
return action?() ?? .none
// end of extension SteppingSubscriber: Publisher
Lo último que nuestro suscriptor debe manejar es completar una promesa. Esto es complicado por varias razones:
- Queremos protegernos de que una promesa se complete varias veces.
- Queremos protegernos contra el cumplimiento de una promesa anterior.
- Podemos estar en cualquier estado cuando se completa una promesa.
Por lo tanto:
extension SteppingSubscriber
private func completePromise(id: PromiseId, request: Request)
let action: (() -> ())? = lock.sync
switch l_state
case .dead, .subscribing(_): return nil
case .subscribed(var subscribed) where subscribed.validPromiseId == id && request == .more:
subscribed.validPromiseId = noPromiseId
l_state = .subscribed(subscribed)
return [sub = subscribed.subscription] in
sub.request(.max(1))
case .subscribed(let subscribed) where subscribed.validPromiseId == id && request == .cancel:
l_state = .dead
return [sub = subscribed.subscription] in
sub.cancel()
case .subscribed(_):
// Multiple completion or stale promise.
return nil
case .stepping(var stepping) where stepping.subscribed.validPromiseId == id && request == .more:
stepping.subscribed.validPromiseId = noPromiseId
stepping.shouldRequestMore = true
l_state = .stepping(stepping)
return nil
case .stepping(let stepping) where stepping.subscribed.validPromiseId == id && request == .cancel:
l_state = .dead
return [sub = stepping.subscribed.subscription] in
sub.cancel()
case .stepping(_):
// Multiple completion or stale promise.
return nil
action?()
¡Uf!
Con todo eso hecho, podemos escribir lo real step
operador:
extension Publisher
func step(with stepper: @escaping (SteppingSubscriber
Y luego podemos probar eso pace
operador desde arriba. Dado que no hacemos ningún almacenamiento en búfer en SteppingSubscriber
, y el flujo ascendente en general no está almacenado en búfer, colocaremos un buffer
entre el río arriba y nuestro pace
operador.
var cans: [AnyCancellable] = []
func application(_ application: UIApplication, didFinishLaunchingWithOptions launchOptions: [UIApplication.LaunchOptionsKey: Any]?) -> Bool
let erratic = Just("A").delay(for: 0.0, tolerance: 0.001, scheduler: DispatchQueue.main).eraseToAnyPublisher()
.merge(with: Just("B").delay(for: 0.3, tolerance: 0.001, scheduler: DispatchQueue.main).eraseToAnyPublisher())
.merge(with: Just("C").delay(for: 0.6, tolerance: 0.001, scheduler: DispatchQueue.main).eraseToAnyPublisher())
.merge(with: Just("D").delay(for: 5.0, tolerance: 0.001, scheduler: DispatchQueue.main).eraseToAnyPublisher())
.merge(with: Just("E").delay(for: 5.3, tolerance: 0.001, scheduler: DispatchQueue.main).eraseToAnyPublisher())
.merge(with: Just("F").delay(for: 5.6, tolerance: 0.001, scheduler: DispatchQueue.main).eraseToAnyPublisher())
.handleEvents(
receiveOutput: print("erratic: (Double(DispatchTime.now().rawValue) / 1_000_000_000) ($0)") ,
receiveCompletion: print("erratic: (Double(DispatchTime.now().rawValue) / 1_000_000_000) ($0)")
)
.makeConnectable()
let subject = PassthroughSubject()
cans += [erratic
.buffer(size: 1000, prefetch: .byRequest, whenFull: .dropOldest)
.pace(.seconds(1), scheduler: DispatchQueue.main, subject: subject)]
cans += [subject.sink(
receiveCompletion: print("paced: (Double(DispatchTime.now().rawValue) / 1_000_000_000) ($0)") ,
receiveValue: print("paced: (Double(DispatchTime.now().rawValue) / 1_000_000_000) ($0)")
)]
let c = erratic.connect()
cans += [AnyCancellable c.cancel() ]
return true
Y aquí, por fin, está el resultado:
erratic: 223394.17115897 A
paced: 223394.171495405 A
erratic: 223394.408086369 B
erratic: 223394.739186984 C
paced: 223395.171615624 B
paced: 223396.27056174 C
erratic: 223399.536717127 D
paced: 223399.536782847 D
erratic: 223399.536834495 E
erratic: 223400.236808469 F
erratic: 223400.236886323 finished
paced: 223400.620542561 E
paced: 223401.703613078 F
paced: 223402.703828512 finished
- Las marcas de tiempo están en unidades de segundos.
- Los horarios erráticos del editor son, de hecho, erráticos y, a veces, cercanos en el tiempo.
- Los tiempos de ritmo están siempre separados por al menos un segundo, incluso cuando los eventos erráticos ocurren con menos de un segundo de diferencia.
- Cuando ocurre un evento errático más de un segundo después del evento anterior, el evento con ritmo se envía inmediatamente después del evento errático sin más demora.
- La finalización con ritmo ocurre un segundo después del último evento con ritmo, aunque la finalización errática ocurre inmediatamente después del último evento errático. los
buffer
no envía la finalización hasta que recibe otra demanda después de enviar el último evento, y esa demanda es retrasada por el temporizador de estimulación.
He puesto toda la implementación del step
operador en esta esencia para copiar / pegar fácilmente.
EDITAR
Existe un enfoque aún más simple que el original que se describe a continuación, que no requiere un marcapasos, sino que utiliza la contrapresión creada por flatMap(maxPublishers: .max(1))
.
flatMap
envía una demanda de 1, hasta que finalice su editor devuelto, lo que podríamos retrasar. Necesitaríamos un Buffer
editor aguas arriba para almacenar los valores.
// for demo purposes, this subject sends a Date:
let subject = PassthroughSubject()
let interval = 1.0
let pub = subject
.buffer(size: .max, prefetch: .byRequest, whenFull: .dropNewest)
.flatMap(maxPublishers: .max(1))
Just($0)
.delay(for: .seconds(interval), scheduler: DispatchQueue.main)
ORIGINAL
Sé que esta es una pregunta antigua, pero creo que hay una forma mucho más sencilla de implementarla, así que pensé en compartirla.
La idea es similar a una .zip
con un Timer
, excepto en lugar de un Timer
, lo harías .zip
con un “tick” retardado de un valor enviado previamente, que se puede lograr con un CurrentValueSubject
. CurrentValueSubject
es necesario en lugar de un PassthroughSubject
para sembrar la primera “garrapata”.
// for demo purposes, this subject sends a Date:
let subject = PassthroughSubject()
let pacer = CurrentValueSubject(())
let interval = 1.0
let pub = subject.zip(pacer)
.flatMap v in
Just(v.0) // extract the original value
.delay(for: .seconds(interval), scheduler: DispatchQueue.main)
.handleEvents(receiveOutput: _ in
pacer.send() // send the pacer "tick" after the interval
)
Lo que pasa es que el .zip
puertas en el marcapasos, que solo llega después de un retraso de un valor enviado previamente.
Si el siguiente valor llega antes del intervalo permitido, espera el marcapasos. Sin embargo, si el siguiente valor llega más tarde, entonces el marcapasos ya tiene un nuevo valor para proporcionar instantáneamente, por lo que no habría demora.
Si lo usó como en su caso de prueba:
let c = pub.sink print("($0): (Date())")
subject.send(Date())
subject.send(Date())
subject.send(Date())
DispatchQueue.main.asyncAfter(deadline: .now() + 1.0)
subject.send(Date())
subject.send(Date())
DispatchQueue.main.asyncAfter(deadline: .now() + 10.0)
subject.send(Date())
subject.send(Date())
el resultado sería algo como esto:
2020-06-23 19:15:21 +0000: 2020-06-23 19:15:21 +0000
2020-06-23 19:15:21 +0000: 2020-06-23 19:15:22 +0000
2020-06-23 19:15:21 +0000: 2020-06-23 19:15:23 +0000
2020-06-23 19:15:22 +0000: 2020-06-23 19:15:24 +0000
2020-06-23 19:15:22 +0000: 2020-06-23 19:15:25 +0000
2020-06-23 19:15:32 +0000: 2020-06-23 19:15:32 +0000
2020-06-23 19:15:32 +0000: 2020-06-23 19:15:33 +0000
Podría Publishers.CollectByTime
ser útil aquí en alguna parte?
Publishers.CollectByTime(upstream: upstreamPublisher.share(), strategy: Publishers.TimeGroupingStrategy.byTime(RunLoop.main, .seconds(1)), options: nil)
Más adelante puedes encontrar las aclaraciones de otros administradores, tú igualmente eres capaz mostrar el tuyo si te gusta.