[Concurrency] YieldingContinuation

Hi everyone,

We'd like to start discussion on an addition to the Concurrency library. This new type will be used for adapting existing multiple-callback functions (for example, closures or delegates) into an AsyncSequence-conforming type. We are building upon the Continuation API in SE-300.

The existing UnsafeContinuation type is designed to be resumed exactly once. For a sequence, we really want a type which can be sent values multiple times and be queried for a value multiple times.

This type can be a low-level construct (like the other Continuation API), because it is intended to be used in types which intend to provide a simple AsyncSequence-based implementation as their interface. Once we have enough low level APIs vending AsyncSequence directly, composing a new one from an existing one becomes a lot more straightforward.

The prototype implementation is here. Here is a slightly simplified API for discussion purposes:

public struct YieldingContinuation<Element, Failure: Error>: Sendable {

  /// Construct a YieldingContinuation.
  ///
  /// This continuation type can be called more than once, unlike the unsafe and
  /// checked counterparts. Each call to the yielding functions will resume any
  /// awaiter on the next function. This type is inherently sendable and can
  /// safely be used and stored in multiple task contexts.
  public init()
  public init(yielding: Element.Type, throwing: Failure.Type)

  /// Resume the task awaiting next by having it return normally from its 
  /// suspension point.
  ///
  /// - Parameter value: The value to return from an awaiting call to next.
  ///
  /// Unlike other continuations `YieldingContinuation` may resume more than
  /// once. However if there are no potential awaiting calls to `next` this
  /// function will return false, indicating that the caller needs to decide how
  /// the behavior should be handled.
  public func yield(_ value: __owned Element) -> Bool

  /// Resume the task awaiting the continuation by having it throw an error
  /// from its suspension point.
  ///
  /// - Parameter error: The error to throw from an awaiting call to next.
  ///
  /// Unlike other continuations `YieldingContinuation` may resume more than
  /// once. However if there are no potential awaiting calls to `next` this
  /// function will return false, indicating that the caller needs to decide how
  /// the behavior should be handled.
  public func yield(throwing error: __owned Failure) -> Bool

  /// Await a resume from a call to a yielding function.
  ///
  /// - Return: The element that was yielded or a error that was thrown.
  public func next() async throws -> Element
}

extension YieldingContinuation where Failure == Never {
   public init(yielding: Element.Type)
   public func next() async -> Element
}

extension YieldingContinuation {
   /// Resume the task awaiting the continuation by having it either
   /// return normally or throw an error based on the state of the given
   /// `Result` value.
   ///
   /// - Parameter result: A value to either return or throw from the
   ///   continuation.
   ///
   /// Unlike other continuations `YieldingContinuation` may resume more than
   /// once. However if there are no potential awaiting calls to `next` this
   /// function will return false, indicating that the caller needs to decide how
   /// the behavior should be handled.
   public func yield<Er: Error>(with result: Result<Element, Er>) -> Bool where Failure == Error
   public func yield(with result: Result<Element, Failure>) -> Bool
   public func yield() -> Bool where Element == Void
}

Here is an example of how to write a wrapper for NotificationCenter using YieldingContinuation:

extension NotificationCenter {
  func notifications(of name: Notification.Name, on object: AnyObject? = nil) -> Notifications {
    return Notifications(center: self, name: name, object: object)
  }
    
  struct Notifications: AsyncSequence {
    let name: Notification.Name
    let object: AnyObject?
    let center: NotificationCenter
    
    typealias Element = Notification
    func makeAsyncIterator() -> Iterator {
      Iterator(center: center, name: name, object: object)
    }
    
    actor Iterator : AsyncIteratorProtocol {
      let name: Notification.Name
      let object: AnyObject?
      let center: NotificationCenter
      
      init(center: NotificationCenter, name: Notification.Name, object: AnyObject? = nil) {
        self.name = name
        self.object = object
        self.center = center
      }
      
      let continuation = YieldingContinuation<Notification, Never>()  
      var observationToken: Any?  
      func next() async -> Notification? {
        observationToken = center.addObserver(forName: name, object: object, queue: nil) { [continuation] in
          // NotificationCenter's behavior is to drop if nothing is registered to receive, so ignore the return value. Other implementations may choose to provide a buffer.
          let _ = continuation.yield($0)
        }
        return await continuation.next()
      }
    }
  }
}

We'd like to get some feedback on how useful this would be for your own adoption of AsyncSequence. There may be room for higher level concepts build on top of this as well, which we should consider in subsequent proposals.

Thank you all for your help!

23 Likes

YieldingContinuation seems straightforwardly useful to cover over the unsafe continuation dance in places where it would be necessary.

3 Likes

Looks intriguing. It seems obvious we need something like this.

I might not be as familiar with the use cases all this concurrency work unlocks so I hope what follows does not come across as aggressive, that is not my intention. I think it is important to consider the higher level use cases and see how lower level API are solutions for those. Right now, it seems like solutions looking for problems. Are you able to offer a few examples of what higher level code would look like?

So, say I’m observing notifications for the application lifecycle events like did become active to refresh a table of favourite soups from an API. How would I use this iterator to observe every notification? How do I control the lifetime of the observation? Would I use this with or without a publisher? What problems does it solve that reactive programming doesn’t already solve?

1 Like

How would buffering work with YieldingContinuation as is though?

There is no notification upon a consumer starting to await on next(). So it seems impossible for a producer to yield values from the buffer in reaction to the consumer making itself available. Without such notifying mechanism, one has to rely on most likely polling yield() regularly. This does not seem optimal in the async world, judging by established async implementations in other languages.

It feels like YieldingContinuation is missing something like func waitForAvailableConsumer() async, which the sender can use to await on a consumer successfully awaiting next(). A buffering iterator can then use it to responsively yield a buffered value, and subsequently wait for the next opportunity to yield.

Otherwise, I am struggling to imagine how high level primitives like Channels can be built on top of YieldingContinuation as envisioned. For example, if we use Kotlin Coroutines’ Channel as the point of reference, YieldingContinuation — without the backward notifying — is sufficient only for a rendezvous (no buffering) channel that always drops incoming values when the consumer is unavailable. Any other buffering strategy seems off the table, since they all need to be made aware of the consumer being available again.

2 Likes

So YieldingContinuation itself does not buffer. However it is intended to be a low level piece that allows you to build your own buffers with it.

For example if you have an actor you can easily build a buffered send as such:

actor Buffered<Element> {
  let continuation = YieldingContinuation(yielding: Element.self)
  var buffer = [Element]()
  
  func push(_ element: Element) {
    if !continuation.yield(element) {
      buffer.append(element)
    }
  }
  
  func pop() async -> Element {
    if buffer.count > 0 {
      return buffer.removeFirst()
    }
    return await continuation.next()
  }
}

This means that you can manage how the buffering is done; dropping other such limiters can be then approached accordingly. This was why the concept of the yielding functions on the continuation return a boolean to indicate if the continuation consumed the value as a return to an awaiting call or not.

6 Likes

I think we need something like this for easier adoption of the async/await feature - so a big +1 from me.

What I do not like about the proposed api is the impedance mismatch to the AsyncSequence protocol:

  1. There is not build in buffering.
  2. The requirement of AsyncIteratorProtocol that after an error only nil must be returned is not handled.

This is an inconvenient middle ground - I would much prefer that YieldingContinuation handles these cases (at least as an option). If you really need to go low level (and provide your custom queue handling), you can simply use the Continuation api and build your own yielding version.

Greetings,
Johannes

1 Like

This would be very useful to bridge different streaming APIs. I was having trouble wrapping my head around bridging Swift-NIO's ChannelHandlers to an async sequence in order to stream post bodies of an HTTP request. This is extremely useful for that and provides the necessary hooks to provide backpressure.

1 Like

To be clear, the existence of the lower level primitive does not preclude a higher level implementation that handles these types of things. As a matter of fact; in order to build higher level items we kinda need some lower level pieces like this to make it potentially happen. This was an alternative avenue that was explored when building the drafts for this type - but the result is that we found that the higher level type required a lot more careful thinking in the regards to how the buffering worked, what "knobs and dials" it needed and the memory/perf impact it might have. The lower level type was definitely pretty clear cut on what it needed to do to get the job done, the higher level type not so much yet.

Being that buffering is pretty trivial w/ an actor it might be something that we discover that is not needed or something that can be composed trivially

3 Likes

This is precisely the application of this type we were hoping for. NIO really makes a lot more sense as async functions instead of "future" like types, and AsyncSequences instead of ChannelHandlers in my opinion. I could see this type allowing for leveraging of some more than just cursory adoption for NIO.

4 Likes

I think it's also important to note that implicit unlimited buffering without any form of backpressure is gnenerally an anti-feature, since it makes it way too easy to hide bottlenecks. I think the proposed design of returning a Bool is the right low-level mechanism that allows for a variety of different policies to be implemented when integrating with Cocoa, NIO, and other existing systems.

3 Likes

My biggest high-level concern with this design is that it leaves you with absolutely no way to know what task/thread is going to unblock the waiting task. Therefore, it subverts some of the central goals of structured concurrency; for example, there's no way to directly avoid priority inversions if the waiting task increases in priority. This is a general problem with unsafe continuations, but unsafe continuations are a low-level escape from the model and are necessary for interacting with other models; moreover, we are going to have to aggressively pursue workarounds to the unsafe continuation problem, such as tracking tasks through blocks that capture their continuations. In contrast, this feels like a higher-level building block, but it's one that's going to naturally result in libraries that don't interact with structured concurrency very well. It will simply become a design barrier to ever doing the right thing, the same way that condition variables are a design barrier with threads.

Also, I'm not sure how this is meant to interact with cancellation and timeouts. A task that's currently registered as the waiter in the atomic object can simply replace itself with nothing; there may be a lost race there, but that's inevitable. There is no API to do such a removal right now, but surely one could be added. However, if a second task comes along and starts waiting on the continuation, it simply remembers the currently-stored continuation, and there's no way to tell this task to drop its reference. This problem goes away if you can disallow multiple concurrent waiters, possibly as a high-level requirement on users. But if you don't disallow multiple concurrent waiters, you're going to need a radically different and probably less efficient structure to allow for such cancellation.

A third problem is that this doesn't seem to allow for move-only result types. We don't have move-only types in the language right now, but it'd be nice if core primitives don't have unnecessary problems with them. UnsafeContinuation doesn't have a problem with move-only results, but this feature would because the same result might need to be passed to multiple waiters. This is another problem that would go away if you disallowed multiple concurrent waiters.

8 Likes

Although it presents as maybe a higher-level API, I think that at least part of the motivation here remains to interoperate with existing APIs that use multi-shot callbacks, delegates, target-action, etc. to represent event streams, meaning it is also a lot like UnsafeContinuation in that it needs to be straightforward to use with those kinds of APIs.

2 Likes

For a low-level api, this is fine, IMO.

In many cases, specialized queue handling just isn't important, especially if you just need to bridge the gap between a high-level callback-based API and async/await. But as you said, that's probably a different proposal then.

The concern about the blocking task/thread is the same concern that will happen with the withUnsafeContinuation API since under the hood the call to next will be doing that same thing. So for whatever solution we find for other continuations will apply here. So I think the natural solutions will fall out in the same manner, the one advantage here is that we do have a bit of extra info - we have the context in which the underlying storage has escaped to make inference on what QoS the production will be when it is awaiting; granted that is a slightly more hand-wavy thing than say pthread_overrides which have concrete tokens from a given potential blocker thread.

Per the interaction with cancellation and timeout - this will rely as much as other continuations on those. In other words, it will rely on the caller to do the proper thing with regards to task cancellation or timeouts (which I don't think exist yet?) - the onus on the developer for cancel would be to properly resume a terminal signal upon that yield. Just as withUnsafeContinuation or any other async callout that may last a good period of time that wants to respect cancellation you need to use Task.withCancellationHandler wrapped around it.

Per the case of move-only types - after discussing this point a bit more it seems that leaving it a structure would be the best route to handle the potential future of move-only.

1 Like

I have taken alot of the feedback very seriously; which has led me to dive into making some pretty extensive adoptions to understand where YieldingContinuation works and does not.

This is a journey of my findings.

It begins with a set of premises about what our goals should be and who the target audience is. One repeating indicator for me has been that if a type or feature ends up requiring the usage of locks when using async/await that feature has missed the mark. Locks themselves are not impossible to use in Swift but they are fraught with pitfalls that can be quite surprising even for seasoned developers. The major need that YieldingContinuation attempts to serve is to be the intermediary between asynchronous non async/await code and the async/await interface; just like other continuations. This means that the majority of the folks who will be using it will be those that maintain libraries - I would expect that projects like AFNetworking (just to name one popular library) would be interested in this type of functionality to bridge the gap between the existing asynchronous world and the new. As with all APIs it does not need to solve all problems but it ought to solve the problem space it is fit into well. Some of the feedback has highlighted some cases where this does not fit as nicely as I would hope, but I think if you can follow this journey it should be clear that it is fixable to be a relatively safe interface that is powerful and pretty ergonomic.

Existing asynchronous sources are often not demand or back-pressure driven. This means that the state is held externally about when to execute next. YieldingContinuation does not aim to solve the back-pressure or demand based systems, nor does it aim to solve the non-external state cases. The scope in my view is to solve the "I have externally driven sources of events that I want to bring into the async/await world and those events can happen something other than 1 times and sometimes they can fail".

There are a couple of ways to handle external events; back-pressure, blocking, buffering, or dropping. Since we have established we are not handling the external back-pressure sources and only handling things like callbacks that happen more than once, or informational delegation we are left with either blocking, buffering or dropping.

Blocking is a land mine from a performance standpoint. But moreover, it does not even make sense when moving to an async/await world since the value will be consumed from an async function that is being awaited upon; this means that the concept of having the caller be blocked until the value is consumed is folly not just for the performance/QoS sense of things. The resume context can be thought of as it is asynchronously calling back on the executor that the initial call was made; in parlance of libdispatch - the callback is dispatch_async on the queue that the originator's call was on. So even if we block until consumption the value is going to be out of the critical region of the source of the event. Overall this option seems like a sub-par choice and something we should actively avoid.

Buffering is probably the right answer in most cases. It does not block the caller. It keeps a hold of the value until it is used (and then removed from the buffer). If the buffer size is specifiable it can devolve into a dropping behavior if the buffer size is limited to 0 elements. This means that this option can represent two distinct possibilities of the options that we want to describe. Feedback on this pitch has already posed the question "What about buffering?". This will come into play further along the journey when it comes to the critical region of the mutating state.

Another sage question was posed of "What about cancellation?". This has an interesting intersection when it comes to the backing storage of this solution because it has a ramification that makes things a bit tangled, but the foreshadowing is that it will be fixed by the same concept as buffering. The function withTaskCancellationHandler requires anything used in the cancellation handler to be sendable but furthermore it requires reference semantics. NotificationCenter has an apt example of what cancellation means; if we have an async sequence we are building of notifications we want to do two things; ensure we return nil and un-register the observer to ensure cleanup of memory. This pattern is repeated with numerous other cancel concepts where the cancellation will preform cleanup tasks - which are mutations in a concurrent context. Structural types cannot be mutated concurrently as an enforcement by the language. So that leaves things with reference semantics. The two reference types to work with are actors and classes. The initial choice of actors is sensible since that is the fitting piece in the async/await and structured concurrency solution, however there are some interesting wrinkles to consider. The most relevant one is that the cancellation would be an async function on said actor. However withCancellationHandler is not async in the handler for cancellation. One could then imagine that a new task would be detached in the handler to invoke cancel. Doing so would pose a problem. The cancel action would overhang the execution of the operation fetching the next value. This means that the cancel could easily be done after the operation is invoked, or worse yet be blocked by said operation if the operation is gated by the actor as well. These problems rule actors out as a logical choice. The remaining consideration of reference types is requiring adopters to use a class to implement their cancellation, however since the class mechanisms don't offer any thread safe ways of ensuring the cancellation is able to be done concurrently it leaves the requirement of using locks. Which I previously stated is likely not a good solution due to its difficulty.

Cancellation and buffering tie into a shared state. Since any sort of cancel must inherently claim a concept of "finished" we must return something from said continuation but without the value being nullable in the call to next there is nothing that could be returned. This means that we must have some sort of terminal state tracking in this type. This means that the current signatures as pitched do not work appropriately. The function next should have a nullable return value. Furthermore some sort of state must be tracked to ensure that values then are no longer produced past the terminal state. This then can be extended to errors being yielded as well.

So far the journey has uncovered a few key details:

  1. The type serves a purpose of filling the gap for callbacks and informational delegates.
  2. The type must incorporate some sort of buffering concept. (dropping is just a buffer of 0)
  3. The type should respect cancellation
  4. The production side (next) should emit nil values and have a finished state

Ideally we would want a type that enforces a concept of one source of truth. Sending values from any random context would probably not be what most developers want. Ideally it would promote an isolation of concerns strategy much like some of the other functions/types do. For example; withCheckedContinuiation enforces isolation of concerns by only having the resume available within the closure. This is reasonably similar to the resume concept we want to offer but we also need a way to call next. So instead of returning a value from a with* style API instead we can have a block passed as part of the initialization of this type and pass in the parts that can yield/resume. The exterior only has the capability of invoking next. In truth this means that the closure interior type is the actual continuation and the external portion is something that either is or has an AsyncIteratorProtocol.

The exterior of the type would only have the capability of accessing a next function, which in truth is congruent with the definition of AsyncSequence. This leaves us with an interesting solution - a type that could be extended in the future with different initializers that have differing behaviors and an external type with no access other than invoking next. Which from an adoption standpoint for some of the APIs I have tried out so far would feel just like the type AFNetworking would want, and hopefully numerous other developers too. It fills the key details outlined and does not require a lock to ever be used.

Here is the proposed interface (some names have been tweaked to reflect the adjusted heritage):

public struct Series<Element> {
  public struct YieldingContinuation: Sendable {
    public func resume(yielding element: __owned Element)
    public func finish()
  }

  public init(
    buffering: Element.Type = Element.self, 
    maxBufferedPendingElements limit: Int = .max, 
    _ build: (YieldingContinuation) -> Void
  )
}

extension Series: AsyncSequence {
  public struct Iterator: AsyncIteratorProtocol {
    public mutating func next() async -> Element?
  }

  public func makeAsyncIterator() -> Iterator
}

extension Series.YieldingContinuation {
  public func resume(
    with result: Result<Element, Never>
  )

  public func resume() where Element == Void
}

Keen observers will note that this does not handle errors or throw, this is the singular drawback of this approach. Throwing must be handled with a distinct type due to the adoption of protocols on more than one constraint not being something doable in the language (and from what I have been told leads down a dark path of potentially unimplementable constraint solving). So this will mean that a secondary type would be introduced to handle the throwing cases.

public struct ThrowingSeries<Element> {
  public struct YieldingContinuation: Sendable {
    public func resume(yielding element: __owned Element)
    public func finish(throwing error: __owned Error? = nil)
  }

  public init(
    buffering: Element.Type = Element.self, 
    maxBufferedPendingElements limit: Int = .max, 
    _ build: (YieldingContinuation) -> Void
  )
}

extension ThrowingSeries: AsyncSequence {
  public struct Iterator: AsyncIteratorProtocol {
    public mutating func next() async throws -> Element?
  }

  public func makeAsyncIterator() -> Iterator
}

extension ThrowingSeries.YieldingContinuation {
  public func resume<Failure: Error>(
    with result: Result<Element, Failure>
  )

  public func resume() where Element == Void
}

Below here is a fully operational NotificationCenter battle station (named funny so that it is clear this is an example). In this particular case I opted for wrapping so that upon cancellation the observer could be removed from the center.

extension NotificationCenter {
  public func battleStation(_ name: Notification.Name, object: AnyObject? = nil) -> BattleStation {
    return BattleStation(center: self, name: name, object: object)
  }

  public struct BattleStation: AsyncSequence {
    public typealias Element = Notification
    
    public struct Iterator: AsyncIteratorProtocol {
      var observer: Any? = nil
      let center: NotificationCenter
      var iterator: Series<Notification>.Iterator

      init(_ observer: Any?, _ center: NotificationCenter, _ iterator: Series<Notification>.Iterator) {
        self.observer = observer
        self.center = center
        self.iterator = iterator
      }

      public mutating func next() async -> Notification? {
        guard let observer = observer else { return nil }
        guard let notification = await iterator.next() else {
          center.removeObserver(observer)
          self.observer = nil
          return nil
        }
        return notification
      }
    }

    var observer: Any? = nil
    let center: NotificationCenter
    let notifications: Series<Notification>

    init(center: NotificationCenter, name: Notification.Name, object: AnyObject?) {
      self.center = center
      var observer: Any?
      notifications = Series(buffering: Notification.self) { continuation in
        observer = center.addObserver(forName: name, object: object, queue: nil) { notification in
          continuation.resume(yielding: notification)
        }
      }
      self.observer = observer
    }

    public func makeAsyncIterator() -> Iterator {
      return Iterator(observer, center, notifications.makeAsyncIterator())
    }
  }
}

Thanks for following along with this long and twisting/turning journey!

11 Likes

AFNetworking

I think you mean Alamofire. :smile:

Throughout the various concurrency pitches applying their patterns to Alamofire has always been at the front of my mind. However, I have yet to gain a clear picture of what the feature’s designers envision for the usage I need. And I’m more confused as to how something like AsyncSequence would apply at all.

The vast majority of Alamofire’s requests return a single value in the end, whether it be Data in memory or a URL to a file on disk. In those situations, there are aspects of the API which may have multiple values, like progress callbacks, URLRequest creation, or other events, but it’s not clear how those could become sequences. Of course, things like our DataStreamRequest or the upcoming WebSocketRequest are more obvious applications of AsyncSequence, similar to the NotificationCenter example you gave, but I haven’t experimented with adapting them yet. So could you be more specific about how AsyncSequence might apply here?

Your mention of locking in the context of cancellation also intrigues me, but it’s unclear how AsyncSequence would apply here as well. Currently Alamofire must go to great pains to ensure cancellation (and other state updates) is performed while locked, as it must interrupt any work in progress and cannot simply be enqueued behind other work. It’s critical that the very next access of the state after cancel() has been called reflects the mutation, otherwise we run into rather tricky race conditions between the state update and work that is already enqueued. This is true within Alamofire as well as from user code. I have yet to see a satisfactory solution to this problem using the new concurrency features. I could make everything async but even that doesn’t solve the problem of having to make the state change before enqueued work is allowed to continue. It seems the concurrency story must offer a lock or a way of prepending work to an executor. I would guess that AsyncSequence only comes into play here if I offer such an adaptor for the multiple return APIs like DataStreamRequest and must pass the cancellation of the sequence through to the request. (As an aside, I’d still want to return the completion event through the sequence in the cancellation case, but that doesn’t seem possible, for similar reasons as Combine. Perhaps I could return an error that encapsulates the value? It would need to be async.)

Any insights you could provide here would be valuable.

My apologies, I’ve been looking through tons of popular (or previously popular) libraries. And yes Alamofire was one that I was more so thinking about. There is opportunity to instead of thinking of things as just an async function to also consider websocket style connections that deliver values over time. The details are besides the point - many values over time do happen (in less regularity than async functions but it is definitely non zero and something to be considered).

Per the case of cancellation the mutation on the iterator post value delivery in my notification example has the side effect of cancelling the active registration (something that is idempotent and thread safe) which I would guess might be similar to other cases. Perhaps I should take the time to put up an example more along those lines not dealing with notifications.

Ok, so I took a bit of time this morning to dig into the cancellation story in Alamofire; first let me show you what the Series equivalent of the publishers that are there:

extension DataStreamRequest {
  var dataStream: ThrowingSeries<Data> {
    return ThrowingSeries(buffering: Data.self) { continuation in
      responseStream { stream in
        /// TODO: Cancellation
        switch stream.event {
        case .stream(let result):
          continuation.resume(with: result)
        case .complete(let completion):
          if let failure = completion.error {
            continuation.finish(throwing: failure)
          } else {
            continuation.finish()
          }
        }
      }
    }
  }
}

I think that turns out pretty nicely. But that does not count for cancellation; it would be reasonable to have the continuation have a handler for cancellation from within the construction. So at that TODO comment if it had something as simple as continuation.onCancel = { stream.cancel() } that might be possible.

I think it is a fair request that cancellation is part of this story more so than "just handle the nil case". In that scenario it could even be apt to your request of thrown errors to disambiguate normal termination and cancellation via something like continuation.onCancel = { stream.cancel(); continuation.resume(throwing: CancellationError()) }

Now of course this is something that would have to be played with to make sure it is fully the right behavior but I think the shape overall is close to being something that is considerably easier to write and considerably easier to get correct than needing to muck about with locking and atomic state and demand.

6 Likes

IMO any Reactive/Event stream based approach like RxSwift, Combine Publishers and etc could be imagined like async sequences.

Quite true; the one major differential is that with other react/event stream based approaches is that AsyncSequence can leverage language fundamentals to avoid the need for locks. The Series implementation is quite similar to PassthroughSubject from Combine (except that Series can act as a return type since there are no ways to send values but from the construction), or like Flow from Kotlin. The existing "prior art" has precedence and leads credibility as that being the right solution.

4 Likes