Saltar al contenido

Swift Combine: ¿Almacenar valores aguas arriba y emitirlos a un ritmo constante?

Después de de una prolongada selección de información pudimos resolver esta duda que tienen algunos usuarios. Te dejamos la respuesta y deseamos serte de mucha apoyo.

Solución:

Este es un problema interesante. Jugué con varias combinaciones de Timer.publish, buffer, zip, y throttle, pero no pude conseguir que ninguna combinación funcionara de la forma deseada. Así que escribamos un suscriptor personalizado.

Lo que realmente nos gustaría es una API donde, cuando obtenemos una entrada desde el flujo ascendente, también tenemos la capacidad de controlar cuándo el flujo ascendente entrega la siguiente entrada. Algo como esto:

extension Publisher 
    /// Subscribe to me with a stepping function.
    /// - parameter stepper: A function I'll call with each of my inputs, and with my completion.
    ///   Each time I call this function with an input, I also give it a promise function.
    ///   I won't deliver the next input until the promise is called with a `.more` argument.
    /// - returns: An object you can use to cancel the subscription asynchronously.
    func step(with stepper: @escaping (StepEvent) -> ()) -> AnyCancellable 
        ???
    


enum StepEvent 
    /// Handle the Input. Call `StepPromise` when you're ready for the next Input,
    /// or to cancel the subscription.
    case input(Input, StepPromise)

    /// Upstream completed the subscription.
    case completion(Subscribers.Completion)


/// The type of callback given to the stepper function to allow it to continue
/// or cancel the stream.
typealias StepPromise = (StepPromiseRequest) -> ()

enum StepPromiseRequest 
    // Pass this to the promise to request the next item from upstream.
    case more

    // Pass this to the promise to cancel the subscription.
    case cancel

Con este step API, podemos escribir una pace operador que hace lo que quieres:

extension Publisher 
    func pace(
        _ pace: Context.SchedulerTimeType.Stride, scheduler: Context, subject: MySubject)
        -> AnyCancellable
        where MySubject.Output == Output, MySubject.Failure == Failure
    
        return step 
            switch $0 
            case .input(let input, let promise):
                // Send the input from upstream now.
                subject.send(input)

                // Wait for the pace interval to elapse before requesting the
                // next input from upstream.
                scheduler.schedule(after: scheduler.now.advanced(by: pace)) 
                    promise(.more)
                

            case .completion(let completion):
                subject.send(completion: completion)
            
        
    

Esta pace operador toma pace (el intervalo requerido entre salidas), un programador en el que programar eventos y un subject en el que volver a publicar las entradas de upstream. Maneja cada entrada enviándola a través de subjecty luego usar el programador para esperar el intervalo de ritmo antes de solicitar la siguiente entrada desde el flujo ascendente.

Ahora solo tenemos que implementar el step operador. Combine no nos ayuda demasiado aquí. Tiene una función llamada “contrapresión”, lo que significa que un editor no puede enviar una entrada en sentido descendente hasta que el descendente lo solicite enviando un Subscribers.Demand río arriba. Por lo general, ves que las aguas abajo envían un .unlimited demanda aguas arriba, pero no vamos a hacerlo. En cambio, vamos a aprovechar la contrapresión. No enviaremos ninguna demanda en sentido ascendente hasta que el paso a paso complete una promesa, y luego solo enviaremos una demanda de .max(1), por lo que hacemos que el flujo ascendente funcione en sincronía con el paso a paso. (También tenemos que enviar una demanda inicial de .max(1) para iniciar todo el proceso.)

De acuerdo, entonces necesito implementar un tipo que tome una función paso a paso y se ajuste a Subscriber. Es una buena idea revisar la Especificación de JVM de Flujos reactivos, porque Combine se basa en esa especificación.

Lo que dificulta la implementación es que varias cosas pueden llamar a nuestro suscriptor de forma asincrónica:

  • El upstream puede llamar al suscriptor desde cualquier hilo (pero es necesario para serializar sus llamadas).
  • Una vez que le hemos dado funciones de promesa al paso a paso, el paso a paso puede llamar a esas promesas en cualquier hilo.
  • Queremos que la suscripción sea cancelable, y esa cancelación puede ocurrir en cualquier hilo.
  • Toda esta asincronicidad significa que tenemos que proteger nuestro estado interno con un candado.
  • Tenemos que tener cuidado de no llamar mientras sujetamos ese candado, para evitar un punto muerto.

También protegeremos al suscriptor de las travesuras que implican llamar a una promesa repetidamente, o llamar a promesas obsoletas, dándole a cada promesa una identificación única.

Aquí está nuestra definición básica de suscriptor:

import Combine
import Foundation

public class SteppingSubscriber 

    public init(stepper: @escaping Stepper) 
        l_state = .subscribing(stepper)
    

    public typealias Stepper = (Event) -> ()

    public enum Event 
        case input(Input, Promise)
        case completion(Completion)
    

    public typealias Promise = (Request) -> ()

    public enum Request 
        case more
        case cancel
    

    public typealias Completion = Subscribers.Completion

    private let lock = NSLock()

    // The l_ prefix means it must only be accessed while holding the lock.
    private var l_state: State
    private var l_nextPromiseId: PromiseId = 1

    private typealias PromiseId = Int

    private var noPromiseId: PromiseId  0 

Observe que moví los tipos auxiliares de antes (StepEvent, StepPromise, y StepPromiseRequest) dentro SteppingSubscriber y acortó sus nombres.

Ahora consideremos l_statetipo misterioso, State. ¿Cuáles son los diferentes estados en los que podría estar nuestro suscriptor?

  • Podríamos estar esperando recibir el Subscription objeto de aguas arriba.
  • Podríamos haber recibido el Subscription desde aguas arriba y estar esperando una señal (una entrada o finalización desde aguas arriba, o la finalización de una promesa del paso a paso).
  • Podríamos estar llamando al paso a paso, que queremos tener cuidado en caso de que cumpla una promesa mientras lo llamamos.
  • Podríamos haber sido cancelados o haber recibido la finalización de upstream.

Así que aquí está nuestra definición de State:

extension SteppingSubscriber 
    private enum State 
        // Completed or cancelled.
        case dead

        // Waiting for Subscription from upstream.
        case subscribing(Stepper)

        // Waiting for a signal from upstream or for the latest promise to be completed.
        case subscribed(Subscribed)

        // Calling out to the stopper.
        case stepping(Stepping)

        var subscription: Subscription? 
            switch self 
            case .dead: return nil
            case .subscribing(_): return nil
            case .subscribed(let subscribed): return subscribed.subscription
            case .stepping(let stepping): return stepping.subscribed.subscription
            
        

        struct Subscribed 
            var stepper: Stepper
            var subscription: Subscription
            var validPromiseId: PromiseId
        

        struct Stepping 
            var subscribed: Subscribed

            // If the stepper completes the current promise synchronously with .more,
            // I set this to true.
            var shouldRequestMore: Bool
        
    

Ya que estamos usando NSLock (para simplificar), definamos una extensión para asegurarnos de que siempre hacemos coincidir el bloqueo con el desbloqueo:

fileprivate extension NSLock 
    @inline(__always)
    func sync(_ body: () -> Answer) -> Answer 
        lock()
        defer  unlock() 
        return body()
    

Ahora estamos listos para manejar algunos eventos. El evento más fácil de manejar es la cancelación asincrónica, que es la Cancellable único requisito del protocolo. Si estamos en cualquier estado excepto .dead, queremos convertirnos .dead y, si hay una suscripción ascendente, cancélela.

extension SteppingSubscriber: Cancellable 
    public func cancel() 
        let sub: Subscription? = lock.sync 
            defer  l_state = .dead 
            return l_state.subscription
        
        sub?.cancel()
    

Observe aquí que no quiero llamar a la suscripción de upstream cancel funcionar mientras lock está bloqueado, porque lock no es un bloqueo recursivo y no quiero arriesgarme a un punto muerto. Todo uso de lock.sync sigue el patrón de aplazar cualquier llamada hasta que se desbloquee la cerradura.

Ahora implementemos el Subscriber requisitos de protocolo. Primero, manejemos la recepción del Subscription desde aguas arriba. La única vez que esto debería suceder es cuando estamos en el .subscribing estado, pero .dead También es posible, en cuyo caso solo queremos cancelar la suscripción upstream.

extension SteppingSubscriber: Subscriber 
    public func receive(subscription: Subscription) 
        let action: () -> () = lock.sync 
            guard case .subscribing(let stepper) = l_state else 
                return  subscription.cancel() 
            
            l_state = .subscribed(.init(stepper: stepper, subscription: subscription, validPromiseId: noPromiseId))
            return  subscription.request(.max(1)) 
        
        action()
    

Tenga en cuenta que en este uso de lock.sync (y en todos los usos posteriores), devuelvo un cierre de “acción” para poder realizar llamadas arbitrarias después de que se haya desbloqueado el bloqueo.

El siguiente Subscriber El requisito de protocolo que abordaremos es recibir una finalización:

    public func receive(completion: Subscribers.Completion) 
        let action: (() -> ())? = lock.sync 
            // The only state in which I have to handle this call is .subscribed:
            // - If I'm .dead, either upstream already completed (and shouldn't call this again),
            //   or I've been cancelled.
            // - If I'm .subscribing, upstream must send me a Subscription before sending me a completion.
            // - If I'm .stepping, upstream is currently signalling me and isn't allowed to signal
            //   me again concurrently.
            guard case .subscribed(let subscribed) = l_state else 
                return nil
            
            l_state = .dead
            return  [stepper = subscribed.stepper] in
                stepper(.completion(completion))
            
        
        action?()
    

El mas complejo Subscriber El requisito de protocolo para nosotros es recibir un Input:

  • Tenemos que crear una promesa.
  • Tenemos que pasar la promesa al paso a paso.
  • El paso a paso podría completar la promesa antes de regresar.
  • Después de que regrese el stepper, tenemos que verificar si completó la promesa con .more y, de ser así, devolver la demanda adecuada en sentido ascendente.

Como tenemos que llamar al paso a paso en medio de este trabajo, tenemos algunos desagradables anidamientos de lock.sync llamadas.

    public func receive(_ input: Input) -> Subscribers.Demand 
        let action: (() -> Subscribers.Demand)? = lock.sync 
            // The only state in which I have to handle this call is .subscribed:
            // - If I'm .dead, either upstream completed and shouldn't call this,
            //   or I've been cancelled.
            // - If I'm .subscribing, upstream must send me a Subscription before sending me Input.
            // - If I'm .stepping, upstream is currently signalling me and isn't allowed to
            //   signal me again concurrently.
            guard case .subscribed(var subscribed) = l_state else 
                return nil
            

            let promiseId = l_nextPromiseId
            l_nextPromiseId += 1
            let promise: Promise =  request in
                self.completePromise(id: promiseId, request: request)
            
            subscribed.validPromiseId = promiseId
            l_state = .stepping(.init(subscribed: subscribed, shouldRequestMore: false))
            return  [stepper = subscribed.stepper] in
                stepper(.input(input, promise))

                let demand: Subscribers.Demand = self.lock.sync 
                    // The only possible states now are .stepping and .dead.
                    guard case .stepping(let stepping) = self.l_state else 
                        return .none
                    
                    self.l_state = .subscribed(stepping.subscribed)
                    return stepping.shouldRequestMore ? .max(1) : .none
                

                return demand
            
        

        return action?() ?? .none
    
 // end of extension SteppingSubscriber: Publisher

Lo último que nuestro suscriptor debe manejar es completar una promesa. Esto es complicado por varias razones:

  • Queremos protegernos de que una promesa se complete varias veces.
  • Queremos protegernos contra el cumplimiento de una promesa anterior.
  • Podemos estar en cualquier estado cuando se completa una promesa.

Por lo tanto:

extension SteppingSubscriber 
    private func completePromise(id: PromiseId, request: Request) 
        let action: (() -> ())? = lock.sync 
            switch l_state 
            case .dead, .subscribing(_): return nil
            case .subscribed(var subscribed) where subscribed.validPromiseId == id && request == .more:
                subscribed.validPromiseId = noPromiseId
                l_state = .subscribed(subscribed)
                return  [sub = subscribed.subscription] in
                    sub.request(.max(1))
                
            case .subscribed(let subscribed) where subscribed.validPromiseId == id && request == .cancel:
                l_state = .dead
                return  [sub = subscribed.subscription] in
                    sub.cancel()
                
            case .subscribed(_):
                // Multiple completion or stale promise.
                return nil
            case .stepping(var stepping) where stepping.subscribed.validPromiseId == id && request == .more:
                stepping.subscribed.validPromiseId = noPromiseId
                stepping.shouldRequestMore = true
                l_state = .stepping(stepping)
                return nil
            case .stepping(let stepping) where stepping.subscribed.validPromiseId == id && request == .cancel:
                l_state = .dead
                return  [sub = stepping.subscribed.subscription] in
                    sub.cancel()
                
            case .stepping(_):
                // Multiple completion or stale promise.
                return nil
            
        

        action?()
    

¡Uf!

Con todo eso hecho, podemos escribir lo real step operador:

extension Publisher 
    func step(with stepper: @escaping (SteppingSubscriber.Event) -> ()) -> AnyCancellable 
        let subscriber = SteppingSubscriber(stepper: stepper)
        self.subscribe(subscriber)
        return .init(subscriber)
    

Y luego podemos probar eso pace operador desde arriba. Dado que no hacemos ningún almacenamiento en búfer en SteppingSubscriber, y el flujo ascendente en general no está almacenado en búfer, colocaremos un buffer entre el río arriba y nuestro pace operador.

    var cans: [AnyCancellable] = []

    func application(_ application: UIApplication, didFinishLaunchingWithOptions launchOptions: [UIApplication.LaunchOptionsKey: Any]?) -> Bool 
        let erratic = Just("A").delay(for: 0.0, tolerance: 0.001, scheduler: DispatchQueue.main).eraseToAnyPublisher()
            .merge(with: Just("B").delay(for: 0.3, tolerance: 0.001, scheduler: DispatchQueue.main).eraseToAnyPublisher())
            .merge(with: Just("C").delay(for: 0.6, tolerance: 0.001, scheduler: DispatchQueue.main).eraseToAnyPublisher())
            .merge(with: Just("D").delay(for: 5.0, tolerance: 0.001, scheduler: DispatchQueue.main).eraseToAnyPublisher())
            .merge(with: Just("E").delay(for: 5.3, tolerance: 0.001, scheduler: DispatchQueue.main).eraseToAnyPublisher())
            .merge(with: Just("F").delay(for: 5.6, tolerance: 0.001, scheduler: DispatchQueue.main).eraseToAnyPublisher())
            .handleEvents(
                receiveOutput:  print("erratic: (Double(DispatchTime.now().rawValue) / 1_000_000_000) ($0)") ,
                receiveCompletion:  print("erratic: (Double(DispatchTime.now().rawValue) / 1_000_000_000) ($0)") 
        )
            .makeConnectable()

        let subject = PassthroughSubject()

        cans += [erratic
            .buffer(size: 1000, prefetch: .byRequest, whenFull: .dropOldest)
            .pace(.seconds(1), scheduler: DispatchQueue.main, subject: subject)]

        cans += [subject.sink(
            receiveCompletion:  print("paced: (Double(DispatchTime.now().rawValue) / 1_000_000_000) ($0)") ,
            receiveValue:  print("paced: (Double(DispatchTime.now().rawValue) / 1_000_000_000) ($0)") 
            )]

        let c = erratic.connect()
        cans += [AnyCancellable  c.cancel() ]

        return true
    

Y aquí, por fin, está el resultado:

erratic: 223394.17115897 A
paced: 223394.171495405 A
erratic: 223394.408086369 B
erratic: 223394.739186984 C
paced: 223395.171615624 B
paced: 223396.27056174 C
erratic: 223399.536717127 D
paced: 223399.536782847 D
erratic: 223399.536834495 E
erratic: 223400.236808469 F
erratic: 223400.236886323 finished
paced: 223400.620542561 E
paced: 223401.703613078 F
paced: 223402.703828512 finished
  • Las marcas de tiempo están en unidades de segundos.
  • Los horarios erráticos del editor son, de hecho, erráticos y, a veces, cercanos en el tiempo.
  • Los tiempos de ritmo están siempre separados por al menos un segundo, incluso cuando los eventos erráticos ocurren con menos de un segundo de diferencia.
  • Cuando ocurre un evento errático más de un segundo después del evento anterior, el evento con ritmo se envía inmediatamente después del evento errático sin más demora.
  • La finalización con ritmo ocurre un segundo después del último evento con ritmo, aunque la finalización errática ocurre inmediatamente después del último evento errático. los buffer no envía la finalización hasta que recibe otra demanda después de enviar el último evento, y esa demanda es retrasada por el temporizador de estimulación.

He puesto toda la implementación del step operador en esta esencia para copiar / pegar fácilmente.

EDITAR

Existe un enfoque aún más simple que el original que se describe a continuación, que no requiere un marcapasos, sino que utiliza la contrapresión creada por flatMap(maxPublishers: .max(1)).

flatMap envía una demanda de 1, hasta que finalice su editor devuelto, lo que podríamos retrasar. Necesitaríamos un Buffer editor aguas arriba para almacenar los valores.

// for demo purposes, this subject sends a Date:
let subject = PassthroughSubject()
let interval = 1.0

let pub = subject
   .buffer(size: .max, prefetch: .byRequest, whenFull: .dropNewest)
   .flatMap(maxPublishers: .max(1)) 
      Just($0)
        .delay(for: .seconds(interval), scheduler: DispatchQueue.main)
   

ORIGINAL

Sé que esta es una pregunta antigua, pero creo que hay una forma mucho más sencilla de implementarla, así que pensé en compartirla.

La idea es similar a una .zip con un Timer, excepto en lugar de un Timer, lo harías .zip con un “tick” retardado de un valor enviado previamente, que se puede lograr con un CurrentValueSubject. CurrentValueSubject es necesario en lugar de un PassthroughSubject para sembrar la primera “garrapata”.

// for demo purposes, this subject sends a Date:
let subject = PassthroughSubject()

let pacer = CurrentValueSubject(())
let interval = 1.0

let pub = subject.zip(pacer)
   .flatMap  v in
      Just(v.0) // extract the original value
        .delay(for: .seconds(interval), scheduler: DispatchQueue.main)
        .handleEvents(receiveOutput:  _ in 
           pacer.send() // send the pacer "tick" after the interval
        ) 
   

Lo que pasa es que el .zip puertas en el marcapasos, que solo llega después de un retraso de un valor enviado previamente.

Si el siguiente valor llega antes del intervalo permitido, espera el marcapasos. Sin embargo, si el siguiente valor llega más tarde, entonces el marcapasos ya tiene un nuevo valor para proporcionar instantáneamente, por lo que no habría demora.


Si lo usó como en su caso de prueba:

let c = pub.sink  print("($0): (Date())") 

subject.send(Date())
subject.send(Date())
subject.send(Date())

DispatchQueue.main.asyncAfter(deadline: .now() + 1.0) 
   subject.send(Date())
   subject.send(Date())


DispatchQueue.main.asyncAfter(deadline: .now() + 10.0) 
   subject.send(Date())
   subject.send(Date())

el resultado sería algo como esto:

2020-06-23 19:15:21 +0000: 2020-06-23 19:15:21 +0000
2020-06-23 19:15:21 +0000: 2020-06-23 19:15:22 +0000
2020-06-23 19:15:21 +0000: 2020-06-23 19:15:23 +0000
2020-06-23 19:15:22 +0000: 2020-06-23 19:15:24 +0000
2020-06-23 19:15:22 +0000: 2020-06-23 19:15:25 +0000
2020-06-23 19:15:32 +0000: 2020-06-23 19:15:32 +0000
2020-06-23 19:15:32 +0000: 2020-06-23 19:15:33 +0000

Podría Publishers.CollectByTime ser útil aquí en alguna parte?

Publishers.CollectByTime(upstream: upstreamPublisher.share(), strategy: Publishers.TimeGroupingStrategy.byTime(RunLoop.main, .seconds(1)), options: nil)

Más adelante puedes encontrar las aclaraciones de otros administradores, tú igualmente eres capaz mostrar el tuyo si te gusta.

¡Haz clic para puntuar esta entrada!
(Votos: 0 Promedio: 0)



Utiliza Nuestro Buscador

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *