Are there continuations for synchronous code?

Specifically, I want an iterator for a container that would be better using a yielding interface. I just discovered the AsyncStream generic type that lets sources yield their elements upstream. The problem is that AsyncStream is asynchronous while IteratorProtocol is synchronous.

Is there a synchronous version of this system? Or is there a way the two types can be linked? My container type is a tree-like structure. So I would pass to my root node the continuation and a true flag, within that I pass the continuation and a false flag to the left sub-tree, yield the direct element, pass the continuation and a false flag to the right sub-tree, then finish the continuation. The tree type is an immutable value type (so changes are implemented by computing the after-state then do wholesale replacement).

2 Likes

When writing AsyncSequence we have an AsyncIterator, a type that conforms to AsyncIteratorProtocol. To do that, we either write our own custom type that conforms to AsyncSequence, or we use the concrete AsyncStream (generally to wrap some existing asynchronous code into an AsyncSequence).

This is modeled after the corresponding Sequence/Iterator/IteratorProtocol pattern.

If I had some tree structure that I wanted to navigate, I would probably add Sequence conformance. And you’d then have a for loop that iterates through that Sequence, and you can then do whatever you want inside that loop (including either synchronous or asynchronous code). If that for loop is doing stuff that is asynchronous, that gives you your continuations.

In answer to your question, you can link the two types together, using async from the Swift Asynchronous Algorithms. That creates an AsyncSequence from a Sequence. But one would generally only do that if you had some routine that was expecting an AsyncSequence and all you had was a normal synchronous Sequence. That seems unnecessary in this case, if I understand your scenario correctly. But that is the way to bridge a Sequence to an AsyncSequence.

You say you want to “pass the continuation”, but I must confess that I am not understanding what problem this is solving. Generally, I just make my custom type conform to Sequence and iterate through that, doing whatever synchronous or asynchronous work that is necessary.

1 Like

I'm going the other way. I want to present a Sequence while internally using an AsyncStream.

If it truly is an AsyncStream internally (i.e., that the work generating the sequence truly is asynchronous), you would generally just expose it as the AsyncSequence that it is. That would enjoying the basic same interface that that you get with Sequence except a Swift concurrency rendition. Is there some reason the exposed interface must be a Sequence rather than an AsyncSequence?

Setting that aside, trying to wrap a AsyncSequence (like AsyncStream is) in a Sequence (yielding the values as they are generated, rather than at the end) would be a serious anti-pattern (a blocking, synchronous interface). Now, obviously, you could build an Array (or any such Sequence) from the AsyncSequence, but only return this Sequence when the entire AsyncSequence is complete, but I’m wagering that is not your intent.

Or are you saying that the work going through your tree structure is really synchronous and you are looking for something for Sequence that is analogous to what AsyncStream is for AsyncSequence?

@CTMacUser seems to be posing this core question: How to get data from async world into sync world?

I can start a Task from a synchronous function, I can cancel that task from that function, but I can't get back any data. I find this quite disappointing to say the least.

1 Like

I’m sympathetic to the general question as we are used to doing things the pre-async way and/or are trying to integrate into legacy codebases and synchronous contexts.

At the risk of being provocative, let me rephrase this in light of the OP’s original question: “We’ve got a whole system designed to let us do work asynchronously without blocking the current thread, but how do we use this and block the current thread?” The answer is that you simply don’t.

What you do is refactor the synchronous code to gracefully bridge to this asynchronous code. The typical pattern is:

  • update the UI to designate that something async is starting
  • start the async work
  • optionally, as the async work progresses, update the UI periodically so the user knows what’s going on
  • when the async work is done do final update of the UI

But we don’t try to make an AsyncSequence behave synchronously. We just update our UI asynchronously.

I’m not sure if I follow you. E.g., here is a UIKit/AppKit example where the async work just returns the final result:

var task: Task<Void, Never>?

@IBAction func didTapStart(_ sender: Any) {
    task = Task {
        let result = await work()          // get data back from `async` function that returns results
        updateUI(with: result)
    }
}

@IBAction func didTapStop(_ sender: Any) {
    task?.cancel()
}

Or, if the async work reports progress with an AsyncSequence:

@IBAction func didTapStart(_ sender: Any) {
    task = Task {
        let sequence = workSequence()      // create `AsyncSequence` that yields values as work progresses; obviously, this could be an `AsyncStream`
        for await update in sequence {
            incrementalUpdateUI(with: update)
        }
    }
}

Or, obviously, one might prefer ObservableObject/@Observation patterns. But my point is that you can get data back from your async work.

FWIW, this “bridge from asynchronous code to synchronous context” problem is no different than it was with completion handler closures. (But, IMHO, we’re in a far better place with async-await than we were with closure-based patterns.)

So, you say “you can’t get back any data.” Maybe I am not understanding your observation, because we get data back all the time. Maybe you can give a concrete example.

3 Likes

I'll add to Robert's answer that there's two different interpretations of "How to get data from async world into sync world?":

  1. "I need to collect the results of async work in a sync context but it doesn't need to happen immediately" — e.g. the traditional "Dispatch.async work to a global queue, then Dispatch.async back to the main queue"; that's just a completion you call at the end of your async work, and Robert covers that
  2. "I need to collect the results of async work in a sync context and I can't make progress in this context until I have a result" — this one is much trickier, because the only way to do this is to block the current thread, and there's no safe way to do that in general

Swift doesn't offer tools to block a sync context on an async context because how you do that is extremely context-sensitive:

  1. You can't block the main thread in UI contexts because your UI will become unresponsive
  2. You can't block a thread in the cooperative thread pool because you're tying up other async work, and may even deadlock yourself
    • There's no mechanism at the moment for knowing whether a given thread is part of the cooperative thread pool, so there isn't a great way to deal with this either
  3. You can block a thread you entirely own, but if you're on your own thread, it's pretty rare that you're also doing synchronous work that needs to be blocked on async work

The only safe way to do (1) or (2) is to not truly block the thread, but to let other work continue to make progress while you wait, and this is really tricky to do right. On Darwin platforms, you can continue, for example, to spin the current thread's RunLoop to allow other work to come in and happen while you wait, but this only works because RunLoops are re-entrant, and you need to be very careful about coordinating state.

The vast majority of the time, if you think you've found yourself wanting (2), think very carefully about whether this is actually what you need, and do your best to refactor to (1).

(There is one place in my current codebase that truly requires (2) because of specific UIKit main-thread constraints, and the only way around it is to put up a progress HUD and continue spinning the main runloop so the app continues to respond to input while work is being done, with a tight deadline for cancellation, and the solution is very much not general-purpose code.)

So if you really want to, e.g., wrap an AsyncStream in a synchronous Sequence interface, you need to figure out what calling Sequence.Iterator.next() will do to the current thread while AsyncStream.Iterator.next() is running asynchronously.

5 Likes

It doesn’t seem like you’re doing any asynchronous computation - the continuation is just a convenient way to emit values as you walk the tree, and could be replaced with a callback closure.

It is possible to transform this in to an Iterator, but sometimes it can be complex to write because you need to maintain a bunch of state between emitted values. Generator functions are a feature of other languages which allow you to write these kinds of sequences more naturally, similar to how you would with a continuation. I hope that we will have them in Swift as well before too long.

Python provides generator functions as a convenient shortcut to building iterators.

def firstn(n):
   num = 0
   while num < n:
       yield num
       num += 1

Note that the expression of the number generation logic is clear and natural. It is very similar to the implementation that built a list in memory, but has the memory usage characteristic of the iterator implementation.

https://wiki.python.org/moin/Generators

5 Likes

Yes. I'm not doing any network nor GUI work. The iterator interface is bad for binary tree visitation; I'm using AsyncStream just to convert a yield interface to an iterator one.

thank you, @robert.ryan.

This is what I mean by “I can’t get back any data."

Given:

// w yields a value >= 0
func w (_ u: Int) -> Int 

func g (_ u: [Int]) -> Int

I wanted to go from this:

func f () -> Int {
    let r1 = w (0)
    let r2 = w (1)
    let r3 = w (2)

    return g ([r1, r2, r3])
}

To this:

func f () -> Int {
    let t = Task {
        let r1 = w (0)
        let r2 = w (1)
        return (r1, r2)
    }
    let r3 = w (2)
    
    // Can't do this in a synchronous function
    let tr = await t.value
    // 'async' property access in a function that does not support concurrency

    return g ([tr.0, tr.1, r3])
}

So, I ended up doing this:

func f () -> Int {
    // Task results, -1 -> result not set
    var o: (Int, Int) = (-1, -1)
    print (#function, o)
    
    Task {
        o.0 = w (0)
        o.1 = w (1)
    }
    let r3 = w (2)
    
    // wait for the task results
    repeat {
        let s = DispatchSemaphore (value: 0)
        _ = s.wait (timeout: .now() + .milliseconds(5))
        print (#function, o)
    }
    while o.0 == -1 || o.1 == -1

    return g ([o.0, o.1, r3])
}

Before continuing and to reiterate what other have said, this not a Good Idea™ so if you have any other way to solve this, do it that other way.

That saidm, I was required to solve the same problem a few years ago - I had to respond to an externally defined synchronous delegate method using asynchronous code.

The "solution" I came up with was based around a value passing semaphore:

func withAsync<T>(_ body: @escaping () async throws -> T) throws -> T {
    let semaphore = ValueSemaphore<Result<T,Error>>()
    Task {
        do {
            semaphore.signal(.success(try await body()))
        } catch {
            semaphore.signal(.failure(error))
        }
    }
    return try semaphore.wait().get()
}

public class ValueSemaphore<Value> {
    private let semaphore = DispatchSemaphore(value: 0)
    private var _value: Value!

    public init() {}

    public func signal(_ value: Value) {
        _value = value
        semaphore.signal()
    }

    public func wait() -> Value {
        semaphore.wait()
        return _value
    }
}

You can use withAsync() like this:

let value: String = try withAsync {
    try await Task.sleep(for: .seconds(5))
    return "foo"
}

print(value)

You should note that I just wrote withAsync to show some generic usage, my actual solution used the Task and ValueSemaphore directly to maintain as much control as possible in this indavisable situation.

I think what @CTMacUser was asking for was a way to write synchronous iterators that is as convenient as AsyncStream's continuation. The whole async -> sync business stems from an idea that you could use an AsyncStream regardless and "re-syncify" it, just to make it easier to write. I think we can all agree that this isn't a great implementation strategy to begin with.

Just to do something simple, you could use a closure parameter. For instance, try implementing forEach - sometimes it can be simpler than implementing a full iterator type:

extension MyDataType {

  func forEach<E>(_ yield: (Element) throws(E) -> Void) throws(E) {
    var n = 0
    while n < 10 {
      try yield(n)
      n += 1
    }
  }

}

This can often get the job done, but it lacks a lot of the features of iterators - it doesn't work with for loops, doesn't have built-in methods like map or lazy wrappers, etc.

Fundamentally though, if you can write a forEach, you can write an iterator that does the same thing. It just might need some careful design.

The name of the feature that automates this transformation is generator functions. This model of defining sequences also has a lot of other benefits, such as naturally supporting non-copyable and non-escaping element types in a way that is easy to compose, and scaling nicely to async sequences, throwing synchronous sequences, and more. They are well established in Python, Javascript, and other languages, and they have even more potential in Swift so I think it's inevitable that we'll add them one day.

More broadly, what is interesting is that this kind of gives you a general mechanism to guide callers to a set of values in-place, without needing to copy or rearrange them at all and incorporating conditional logic that is only lazily evaluated. In a sense, your code becomes the collection:

struct Address {
  var street: InlineArray<3, String>
  var city: String
  var country: String
  var message: String?

  var allLines: many String {
    // These strings could be stored anywhere. We don't copy them;
    // we take the caller by the hand and guide them to these values in-place.

    for line in street {
      yield line
    }
    yield city
    yield country
    if let message {
      yield "Note: \(message)"
    }
  }
}

let myAddress: Address = ...

for line in myAddress.allLines {
  print(line)
}

There are no collections here, no copying values in to an Array or other special storage required for iteration. I think this is super cool.

5 Likes

@ibex10 – OK, I get what you are saying. Is it fair to rephrase this as “how to synchronously wait for a result from an asynchronous Task”?

Assuming so, consider:

So, there are a few choices:

  1. Let’s first acknowledge the obvious alternative: If introducing Swift concurrency, I would be inclined to make f an async function. Then it just works. Obviously, one might have to refactor the caller to adopt Swift concurrency patterns, too, if it doesn’t already. But if you migrate the whole code unit to adopt Swift concurrency, you can avoid introducing anti-patterns within your brand-new, Swift concurrency code.

    FWIW, this is slightly different than the OP’s scenario, where AsyncStream (an AsyncSequence) was introduced in an inherently synchronous process merely for the sake of convenience of the AsyncStream interface. In this original scenario, I personally would just write a synchronous BFS or DFS Iterator. Sure, the frameworks don’t provide one out of the box, but it’s a thoroughly well researched set of algorithms. I hesitate to delve into that, though, as the OP (somewhat understandably) doesn’t seem to want to go there and choosing the right algorithm involves diving into a ton of implementation details (e.g., there are some BFS/DFS algorithms that are elegantly simple but not terribly memory efficient, there are slightly more complicated algorithms that tradeoff memory issues for some tree-traversal overhead, and there are even more complicate ones that address that, too; bottom line, there are a bunch of memory/performance/complexity tradeoffs that will dictate the final choice).

  2. In your example, you appear to have a series of synchronous functions in which you want w(0) and w(1) to run sequentially with respect to each other, but you want them to run in parallel with w(2), and you want to block the thread (!!!) until all three finish. If that’s the case, I personally would fall back to GCD patterns:

    func f() -> Int {
        // Might this take more than a few milliseconds? If so, you may
        // want to make sure you never do this synchronous work on the
        // main queue.
        //
        // dispatchPrecondition(condition: .notOnQueue(.main))
    
        let group = DispatchGroup()
        let queue = DispatchQueue.global()
        
        nonisolated(unsafe) var r0: Int! // `nonisolated(unsafe)` to silence Swift thread-safety warnings; the programmer is vouching for its thread-safety
        nonisolated(unsafe) var r1: Int!
        nonisolated(unsafe) var r2: Int!
        
        queue.async(group: group) {
            r0 = w(0)
            r1 = w(1)
        }
        
        queue.async(group: group) {
            r2 = w(2)
        }
        
        group.wait()
        
        return g([r0, r1, r2])
    }
    

    The above is fine if the number of work items is very limited. If you had tons of parallel work items, you would probably reach for concurrentPerform, or other similar legacy patterns, to mitigate problems arising from thread-explosion scenarios.

    This avoids the inefficiency of spinning while you wait for the results.

    I offer this example with a significant caveat: Writing slow and synchronous code with thread-blocking characteristics should almost always be avoided, where possible. Sometimes we are dealing with some third-party library that only offers synchronous interfaces, so one has to do what one has to do, but in our own codebase, we would almost always avoid this pattern. (See point 1, above.)

    Now, some will quickly retort, “don’t mix GCD with Swift concurrency!” This has been repeated so often that it has been taken as gospel. While that’s generally good counsel (one really should avoid inserting GCD unnecessarily into a Swift concurrency codebase), Apple has been specific that if you’re doing things that can’t honor our contract with the Swift concurrency system, you might keep that out of its cooperative thread pool, altogether. E.g., in Visualize and optimize Swift concurrency they explicitly give an example where they explicitly advise moving that code into a dispatch queue (and bridging back to Swift concurrency with a continuation, if necessary).

    In short, if you cannot honor the contract with the Swift concurrency system, think hard about where it belongs there at all. With no offense, introducing semaphores or spinning/polling is generally code smell.

  3. For the sake of completeness, if one was engaged in some process that was doing something computationally intensive within the Swift concurrency system, one might periodically await Task.yield() in the loop to make sure sure we let other, higher-priority tasks a chance to run. It likely is not applicable in your use-case (it is intended for scenarios where you are in an asynchronous context but doing something that might prevent the thread from making “forward progress”), but I feel compelled to acknowledge it in scenarios whenever I see spinning in a loop with Swift concurrency.

If you really want to explore these scenarios in more detail, we might want to move this discussion to a different question/forum, so we don’t further cloud the already confusing exchange present on this thread. We should probably let this thread focus on CTMacUser’s question.

1 Like

Actually, here's the specific answer to your question: coroutines of the form found in Python only exist using Swift concurrency. They can however be replicated using the mechanism below:

let seq = sequence { continuation in
    continuation.yield(1)

    continuation.finish()
}

for i in seq {
    print(i)
    if i > 10 {
        break
    }
}

public func sequence<T>(_ elementType: T.Type = T.self, bufferingPolicy limit: AsyncStream<T>.Continuation.BufferingPolicy = .unbounded, _ build: (AsyncStream<T>.Continuation) -> Void) -> UnfoldSequence<T, Void> {
    let asyncStream = AsyncStream(T.self, bufferingPolicy: limit, build)
    var iterator = asyncStream.makeAsyncIterator()

    return sequence(state: ()) { _ in
        let semaphore = ValueSemaphore<T?>()
        Task {
            semaphore.signal(await iterator.next())
        }
        return semaphore.wait()
    }
}

However, for your usecase I'd probably fall back to the old ways of going up and down the tree yourself, especially if you wish to implement breadth first visiting.

1 Like

For giggles and grins, I took a pass at writing a simple SyncStream, a synchronous analog to AsyncStream:

final class SyncStream<Element>: Sequence, @unchecked Sendable {
    private var buffer: [Element] = []
    private var isFinished = false
    private let condition = NSCondition()

    init(_ build: @Sendable (Continuation) -> Void) {
        let continuation = Continuation { [weak self] value in
            guard let self else { return }
            condition.lock()
            buffer.append(value)
            condition.signal()
            condition.unlock()
        } finish: { [weak self] in
            guard let self else { return }
            condition.lock()
            isFinished = true
            condition.broadcast()
            condition.unlock()
        }

        build(continuation)
    }

    func makeIterator() -> AnyIterator<Element> {
        AnyIterator { [self] in
            condition.withLock {
                while buffer.isEmpty, !isFinished {
                    condition.wait()
                }

                if !buffer.isEmpty {
                    return buffer.removeFirst()
                } else {
                    return nil
                }
            }
        }
    }
}

extension SyncStream {
    final class Continuation: Sendable {
        private let yieldClosure: @Sendable (Element) -> Void
        private let finishClosure: @Sendable () -> Void

        init(yield: @Sendable @escaping (Element) -> Void, finish: @Sendable @escaping () -> Void) {
            self.yieldClosure = yield
            self.finishClosure = finish
        }

        func yield(_ value: Element) {
            yieldClosure(value)
        }

        func finish() {
            finishClosure()
        }
    }
}

Then you can do things like:

let sequence = SyncStream { continuation in
    for i in 0 ..< 10 {
        continuation.yield(i)
    }
    continuation.finish()
}

for value in sequence {
    print(value)
}

Or, I might push the work to another thread if the sequence takes a long time to generate. E.g., here is a rendition with a Thread.sleep to simulate some slow yielding process:

let sequence2 = SyncStream { continuation in
    DispatchQueue.global().async {
        for i in 0 ..< 10 {
            continuation.yield(i)
            Thread.sleep(forTimeInterval: 1)
        }
        continuation.finish()
    }
}

for value in sequence2 {
    print(value)
}

Anyway, if I wanted to do this a lot, I might write a convenience method to do that for me:

extension SyncStream {
    /// Initializes the stream and runs the continuation on a background thread.
    static func background(
        qos: DispatchQoS = .unspecified,
        build: @Sendable @escaping (Continuation) -> Void
    ) -> SyncStream {
       SyncStream { continuation in
            DispatchQueue(label: "SyncStream", qos: qos).async {
                build(continuation)
            }
        }
    }
}

And then:

let sequence3 = SyncStream.background { continuation in
    for i in 0 ..< 10 {
        continuation.yield(i)
        Thread.sleep(forTimeInterval: 1)
    }
    continuation.finish()
}

for value in sequence3 {
    print(value)
}

Now, clearly I haven’t implemented all the richness of AsyncStream. And it seems like there are all sorts of possible variations of the theme. And I confess that I threw this together without a ton of testing. But maybe this is a starting point.


For example, you might want back-pressure semantics, e.g., perhaps have the build closure wait when the buffer is full (to avoid an unbridled buffer size):

class SyncStream<Element>: @unchecked Sendable {
    private var buffer: [Element] = []
    private var isFinished = false
    private let maxBufferSize: Int?
    private let condition = NSCondition()
    
    init(
        maxBufferSize: Int? = nil,
        _ build: (Continuation) -> Void
    ) {
        self.maxBufferSize = maxBufferSize
        
        let continuation = Continuation { [weak self] value in
            guard let self else { return }
            
            condition.lock()
            while let max = self.maxBufferSize, buffer.count >= max {
                condition.wait()
            }
            buffer.append(value)
            condition.signal() // notify consumer
            condition.unlock()
        } finish: { [weak self] in
            guard let self else { return }
            condition.lock()
            isFinished = true
            condition.broadcast() // notify all waiters
            condition.unlock()
        }
        build(continuation)
    }
}

extension SyncStream: Sequence {
    func makeIterator() -> AnyIterator<Element> {
        AnyIterator { [self] in
            condition.lock()
            defer { condition.unlock() }

            while buffer.isEmpty, !isFinished {
                condition.wait()
            }

            if !buffer.isEmpty {
                let value = buffer.removeFirst()
                condition.signal() // notify producer (if waiting)
                return value
            } else {
                return nil
            }
        }
    }
}

extension SyncStream {
    final class Continuation: Sendable {
        private let yieldClosure: @Sendable (Element) -> Void
        private let finishClosure: @Sendable () -> Void

        fileprivate init(
            yield: @Sendable @escaping (Element) -> Void,
            finish: @Sendable @escaping () -> Void
        ) {
            self.yieldClosure = yield
            self.finishClosure = finish
        }

        func yield(_ value: Element) {
            yieldClosure(value)
        }

        func finish() {
            finishClosure()
        }
    }
}

extension SyncStream {
    /// Initializes the stream and runs the continuation on a background thread.
    static func background(
        maxBufferSize: Int? = nil,
        qos: DispatchQoS = .unspecified,
        build: @Sendable @escaping (Continuation) -> Void
    ) -> SyncStream {
        SyncStream(maxBufferSize: maxBufferSize) { continuation in
            DispatchQueue(label: "SyncStream", qos: qos).async {
                build(continuation)
            }
        }
    }
}

And then:

let sequence4 = SyncStream.background(maxBufferSize: 10) { continuation in
    for i in 0 ..< 100 {
        continuation.yield(i)
    }
    continuation.finish()
}

for value in sequence4 {
    print(value)
    Thread.sleep(forTimeInterval: 0.2)
}

Note, if you have the build closure wait once the buffer fills, it becomes essential to ensure that the build closure runs on a different thread, or else it can deadlock, which is why I used my background rendition in this scenario.

As you can see, you can go nuts with whatever buffer/back-pressure semantics make sense for you. But if your tree is potentially huge, it probably makes sense to limit the buffer somehow.


But the bottom line, I probably would avoid creating an AsyncStream if it is really a synchronous sequence. I’d either write an Iterator or do something like this.

3 Likes

I love this solution as I think it solves the actual problem as stated by showing how we can implement generator functions ourselves, without resorting to structured concurrency.

As Karl pointed out earlier

Amen to that, though I suspect we'll end up with a truly async Swift 7 first, which isn't so bad I guess.