Swift Concurrency: Resuming a stored continuation from an actor does not work

Bear with me, I can't show a minimal example yet, but you might get the gist from the example below.

I have an actor that communicates over a serial line with an adapter. Upon send, it creates a continuation and stores it. Later, when the response has been completed, I try to resume the continuation, but nothing happens. Any idea what could be wrong?

//
//  Cornucopia – (C) Dr. Lauer Information Technology
//
import Foundation

typealias Continuation = CheckedContinuation<String, Error>

public enum StreamError: Error {
    case invalidEncoding
}

public class StreamCommand {

    enum State {
        case created
        case transmitting
        case transmitted
        case responding
        case completed
    }

    private var outputBuffer: [UInt8] = []
    private var inputBuffer: [UInt8] = []
    private var tempBuffer: [UInt8] = .init(repeating: 0, count: 8192)
    var state: State = .created {
        didSet {
            print("state now \(state)")
        }
    }
    let continuation: Continuation
    let termination: [UInt8]

    var canWrite: Bool { self.state == .created || self.state == .transmitting }
    var canRead: Bool { self.state == .transmitted || self.state == .responding }
    var isCompleted: Bool { self.state == .completed }

    init(string: String, timeout: TimeInterval, termination: String, continuation: Continuation) {
        self.outputBuffer = Array(string.utf8)
        self.termination = Array(termination.utf8)
        self.continuation = continuation
    }

    func write(to stream: OutputStream) {
        precondition(self.canWrite)
        self.state = .transmitting

        let written = stream.write(&outputBuffer, maxLength: outputBuffer.count)
        outputBuffer.removeFirst(written)
        print("wrote \(written) bytes")
        if outputBuffer.isEmpty {
            self.state = .transmitted
        }
    }

    func read(from stream: InputStream) {
        precondition(self.canRead)
        self.state = .responding

        let read = stream.read(&self.tempBuffer, maxLength: self.tempBuffer.count)
        print("read \(read) bytes")
        self.inputBuffer += self.tempBuffer[0..<read]
        guard let terminationRange = self.inputBuffer.lastRange(of: self.termination) else {
            return
        }
        self.state = .completed
    }

    func resumeContinuation() {
        guard let response = String(bytes: self.inputBuffer, encoding: .utf8) else {
            self.continuation.resume(throwing: StreamError.invalidEncoding)
            return
        }
        self.continuation.resume(returning: response)
    }
}

public actor StreamCommandQueue: NSObject {

    let input: InputStream
    let output: OutputStream
    var pendingCommands: [StreamCommand] = []
    var activeCommand: StreamCommand?
    let termination: String

    init(input: InputStream, output: OutputStream, termination: String = "") {
        self.input = input
        self.output = output
        self.termination = termination

        super.init()

        self.input.delegate = self
        self.output.delegate = self

        self.input.schedule(in: RunLoop.current, forMode: .default)
        self.output.schedule(in: RunLoop.current, forMode: .default)

        self.input.open()
    }

    func send(string: String, timeout: TimeInterval) async throws -> String {

        let response: String = try await withCheckedThrowingContinuation { continuation in
            self.activeCommand = StreamCommand(string: string, timeout: timeout, termination: self.termination, continuation: continuation)
            self.outputActiveCommand()
        }
        return response
    }

    func outputActiveCommand() {

        guard self.output.streamStatus == .open else { return self.output.open() }
        guard self.output.hasSpaceAvailable else { return }
        guard let command = self.activeCommand else { fatalError() }
        guard command.canWrite else {
            print("command sent, waiting for response...")
            return
        }
        command.write(to: self.output)
    }

    func inputActiveCommand() {

        guard self.input.streamStatus == .open else { return }
        guard self.input.hasBytesAvailable else { return }
        guard let command = self.activeCommand else { fatalError("received unsolicited bytes") }
        guard command.canRead else {
            print("command not ready for reading...")
            return
        }
        command.read(from: self.input)
        if command.isCompleted {
            command.resumeContinuation()
        }
    }
}

extension StreamCommandQueue: StreamDelegate {

    nonisolated public func stream(_ aStream: Stream, handle eventCode: Stream.Event) {
        print("stream \(aStream), event \(eventCode)")

        switch (aStream, eventCode) {
            case (self.output, .hasSpaceAvailable):
                async { await self.outputActiveCommand() }

            case (self.input, .hasBytesAvailable):
                async { await self.inputActiveCommand() }

            default:
                break
        }


    }

}

I briefly scanned continuation proposal and PartialAsyncTask.swift source and couldn't find any info on whether it's allowed to escape the continuation from the closure or not (like it was with Unsafe*Pointer). However, since there is some Builtin stuff going on there, my guess that it's not welcomed. Otherwise we could have API like EventLoopPromise from SwiftNIO.

Ok, if this was the case it would severely limit the usefulness of bridging async and completion-handler-based worlds. Besides it silently failing (which would be sad in any case), I really hope it‘s just a bug.

1 Like

Escaping the continuation is the expected use pattern.

You're sure that you're actually resuming the continuation?

1 Like

100%, the breakpoint gets hit, but even „stepIn“ goes right over to the next line.
Is there a way to help with better diagnostics? Po‘ing the continuation in lldb fails as well, btw.

Resuming a continuation is asynchronous, so it's expected that stepping in/over a resumption doesn't immediately cause something to happen. But the asynchronous function should start executing again.

1 Like

This is pretty surprising to me. If this is the case, why the global with functions rather than a closure-based initializer, like Task has settled on? My assumption was always that the continuation shouldn't escape its providing closure, given the API's similarity to the various unsafe APIs already in Swift which bound unsafe access to the closure. Rereading the original continuations proposal, there's no mention of why the global functions were chosen over initializers. However, it does show an escaping use, which I didn't realize before.

Ok, here is a minimal, self-contained, example that shows the problem:

//
//  Cornucopia – (C) Dr. Lauer Information Technology
//
import Foundation

typealias Continuation = CheckedContinuation<String, Error>

public enum StreamError: Error {
    case invalidEncoding
}

public class StreamCommand {

    let continuation: Continuation

    init(continuation: Continuation) {
        self.continuation = continuation
    }

    func resumeContinuation() {
        let response = "fooBar"
        print("<triggering continuation resume> \(self.continuation)")
        self.continuation.resume(returning: response)
        print("</triggering continuation resume>")
    }
}

public actor StreamCommandQueue: NSObject {

    var activeCommand: StreamCommand?

    func send(string: String, timeout: TimeInterval) async throws -> String {

        print("awaiting...")
        let response: String = try await withCheckedThrowingContinuation { continuation in
            print("continuation: \(continuation)")
            self.activeCommand = StreamCommand(continuation: continuation)
            self.outputActiveCommand()
        }
        print("came back after awaiting")
        return response
    }

    func outputActiveCommand() {
        async {
            self.activeCommand?.resumeContinuation()
        }
    }

    func inputActiveCommand() {

    }
}

func doIt() {

    async {
        let streamQueue = StreamCommandQueue()
        do {
            let identification = try await streamQueue.send(string: "ATI\r", timeout: 1)
            print("identification: \(identification)")
        } catch {
            print("can't get identification: \(error)")
        }
    }

}


1 Like

Further inspection shows that this seems to be an issue with the actor – changing the actor to a class makes the program work.

Opened an issue @ [SR-14841] Concurrency: Resuming a stored continuation from an actor does not work · Issue #57188 · apple/swift · GitHub

Hey, maybe that is related:

Swift

Known Issues

  • The async doc://com.apple.documentation/documentation/swift/task APIs in macOS, watchOS, and tvOS differ from the async Task APIs in iOS. As a result, Task API calls on Actor objects in multi-platform projects may cause async functions to hang at a suspension point. (79378627)
1 Like

I don‘t have an iOS15 device to test… perhaps someone here could test drive this theory?

You should be able to test it by running your code in another platform simulator, like tvOS, or on a Monterey instance.

Perhaps you're scheduling the stream on a runloop that is not running? The actor is running on its own serial queue.