I have taken alot of the feedback very seriously; which has led me to dive into making some pretty extensive adoptions to understand where YieldingContinuation
works and does not.
This is a journey of my findings.
It begins with a set of premises about what our goals should be and who the target audience is. One repeating indicator for me has been that if a type or feature ends up requiring the usage of locks when using async/await that feature has missed the mark. Locks themselves are not impossible to use in Swift but they are fraught with pitfalls that can be quite surprising even for seasoned developers. The major need that YieldingContinuation
attempts to serve is to be the intermediary between asynchronous non async/await code and the async/await interface; just like other continuations. This means that the majority of the folks who will be using it will be those that maintain libraries - I would expect that projects like AFNetworking (just to name one popular library) would be interested in this type of functionality to bridge the gap between the existing asynchronous world and the new. As with all APIs it does not need to solve all problems but it ought to solve the problem space it is fit into well. Some of the feedback has highlighted some cases where this does not fit as nicely as I would hope, but I think if you can follow this journey it should be clear that it is fixable to be a relatively safe interface that is powerful and pretty ergonomic.
Existing asynchronous sources are often not demand or back-pressure driven. This means that the state is held externally about when to execute next. YieldingContinuation
does not aim to solve the back-pressure or demand based systems, nor does it aim to solve the non-external state cases. The scope in my view is to solve the "I have externally driven sources of events that I want to bring into the async/await world and those events can happen something other than 1 times and sometimes they can fail".
There are a couple of ways to handle external events; back-pressure, blocking, buffering, or dropping. Since we have established we are not handling the external back-pressure sources and only handling things like callbacks that happen more than once, or informational delegation we are left with either blocking, buffering or dropping.
Blocking is a land mine from a performance standpoint. But moreover, it does not even make sense when moving to an async/await world since the value will be consumed from an async function that is being awaited upon; this means that the concept of having the caller be blocked until the value is consumed is folly not just for the performance/QoS sense of things. The resume context can be thought of as it is asynchronously calling back on the executor that the initial call was made; in parlance of libdispatch - the callback is dispatch_async
on the queue that the originator's call was on. So even if we block until consumption the value is going to be out of the critical region of the source of the event. Overall this option seems like a sub-par choice and something we should actively avoid.
Buffering is probably the right answer in most cases. It does not block the caller. It keeps a hold of the value until it is used (and then removed from the buffer). If the buffer size is specifiable it can devolve into a dropping behavior if the buffer size is limited to 0 elements. This means that this option can represent two distinct possibilities of the options that we want to describe. Feedback on this pitch has already posed the question "What about buffering?". This will come into play further along the journey when it comes to the critical region of the mutating state.
Another sage question was posed of "What about cancellation?". This has an interesting intersection when it comes to the backing storage of this solution because it has a ramification that makes things a bit tangled, but the foreshadowing is that it will be fixed by the same concept as buffering. The function withTaskCancellationHandler
requires anything used in the cancellation handler to be sendable but furthermore it requires reference semantics. NotificationCenter
has an apt example of what cancellation means; if we have an async sequence we are building of notifications we want to do two things; ensure we return nil and un-register the observer to ensure cleanup of memory. This pattern is repeated with numerous other cancel concepts where the cancellation will preform cleanup tasks - which are mutations in a concurrent context. Structural types cannot be mutated concurrently as an enforcement by the language. So that leaves things with reference semantics. The two reference types to work with are actors and classes. The initial choice of actors is sensible since that is the fitting piece in the async/await and structured concurrency solution, however there are some interesting wrinkles to consider. The most relevant one is that the cancellation would be an async function on said actor. However withCancellationHandler
is not async in the handler for cancellation. One could then imagine that a new task would be detached in the handler to invoke cancel. Doing so would pose a problem. The cancel action would overhang the execution of the operation fetching the next value. This means that the cancel could easily be done after the operation is invoked, or worse yet be blocked by said operation if the operation is gated by the actor as well. These problems rule actors out as a logical choice. The remaining consideration of reference types is requiring adopters to use a class to implement their cancellation, however since the class mechanisms don't offer any thread safe ways of ensuring the cancellation is able to be done concurrently it leaves the requirement of using locks. Which I previously stated is likely not a good solution due to its difficulty.
Cancellation and buffering tie into a shared state. Since any sort of cancel must inherently claim a concept of "finished" we must return something from said continuation but without the value being nullable in the call to next there is nothing that could be returned. This means that we must have some sort of terminal state tracking in this type. This means that the current signatures as pitched do not work appropriately. The function next
should have a nullable return value. Furthermore some sort of state must be tracked to ensure that values then are no longer produced past the terminal state. This then can be extended to errors being yielded as well.
So far the journey has uncovered a few key details:
- The type serves a purpose of filling the gap for callbacks and informational delegates.
- The type must incorporate some sort of buffering concept. (dropping is just a buffer of 0)
- The type should respect cancellation
- The production side (
next
) should emit nil values and have a finished state
Ideally we would want a type that enforces a concept of one source of truth. Sending values from any random context would probably not be what most developers want. Ideally it would promote an isolation of concerns strategy much like some of the other functions/types do. For example; withCheckedContinuiation
enforces isolation of concerns by only having the resume available within the closure. This is reasonably similar to the resume concept we want to offer but we also need a way to call next. So instead of returning a value from a with*
style API instead we can have a block passed as part of the initialization of this type and pass in the parts that can yield/resume. The exterior only has the capability of invoking next. In truth this means that the closure interior type is the actual continuation and the external portion is something that either is or has an AsyncIteratorProtocol
.
The exterior of the type would only have the capability of accessing a next function, which in truth is congruent with the definition of AsyncSequence. This leaves us with an interesting solution - a type that could be extended in the future with different initializers that have differing behaviors and an external type with no access other than invoking next. Which from an adoption standpoint for some of the APIs I have tried out so far would feel just like the type AFNetworking would want, and hopefully numerous other developers too. It fills the key details outlined and does not require a lock to ever be used.
Here is the proposed interface (some names have been tweaked to reflect the adjusted heritage):
public struct Series<Element> {
public struct YieldingContinuation: Sendable {
public func resume(yielding element: __owned Element)
public func finish()
}
public init(
buffering: Element.Type = Element.self,
maxBufferedPendingElements limit: Int = .max,
_ build: (YieldingContinuation) -> Void
)
}
extension Series: AsyncSequence {
public struct Iterator: AsyncIteratorProtocol {
public mutating func next() async -> Element?
}
public func makeAsyncIterator() -> Iterator
}
extension Series.YieldingContinuation {
public func resume(
with result: Result<Element, Never>
)
public func resume() where Element == Void
}
Keen observers will note that this does not handle errors or throw, this is the singular drawback of this approach. Throwing must be handled with a distinct type due to the adoption of protocols on more than one constraint not being something doable in the language (and from what I have been told leads down a dark path of potentially unimplementable constraint solving). So this will mean that a secondary type would be introduced to handle the throwing cases.
public struct ThrowingSeries<Element> {
public struct YieldingContinuation: Sendable {
public func resume(yielding element: __owned Element)
public func finish(throwing error: __owned Error? = nil)
}
public init(
buffering: Element.Type = Element.self,
maxBufferedPendingElements limit: Int = .max,
_ build: (YieldingContinuation) -> Void
)
}
extension ThrowingSeries: AsyncSequence {
public struct Iterator: AsyncIteratorProtocol {
public mutating func next() async throws -> Element?
}
public func makeAsyncIterator() -> Iterator
}
extension ThrowingSeries.YieldingContinuation {
public func resume<Failure: Error>(
with result: Result<Element, Failure>
)
public func resume() where Element == Void
}
Below here is a fully operational NotificationCenter battle station (named funny so that it is clear this is an example). In this particular case I opted for wrapping so that upon cancellation the observer could be removed from the center.
extension NotificationCenter {
public func battleStation(_ name: Notification.Name, object: AnyObject? = nil) -> BattleStation {
return BattleStation(center: self, name: name, object: object)
}
public struct BattleStation: AsyncSequence {
public typealias Element = Notification
public struct Iterator: AsyncIteratorProtocol {
var observer: Any? = nil
let center: NotificationCenter
var iterator: Series<Notification>.Iterator
init(_ observer: Any?, _ center: NotificationCenter, _ iterator: Series<Notification>.Iterator) {
self.observer = observer
self.center = center
self.iterator = iterator
}
public mutating func next() async -> Notification? {
guard let observer = observer else { return nil }
guard let notification = await iterator.next() else {
center.removeObserver(observer)
self.observer = nil
return nil
}
return notification
}
}
var observer: Any? = nil
let center: NotificationCenter
let notifications: Series<Notification>
init(center: NotificationCenter, name: Notification.Name, object: AnyObject?) {
self.center = center
var observer: Any?
notifications = Series(buffering: Notification.self) { continuation in
observer = center.addObserver(forName: name, object: object, queue: nil) { notification in
continuation.resume(yielding: notification)
}
}
self.observer = observer
}
public func makeAsyncIterator() -> Iterator {
return Iterator(observer, center, notifications.makeAsyncIterator())
}
}
}
Thanks for following along with this long and twisting/turning journey!