mickeyl
(Dr. Mickey Lauer)
1
Bear with me, I can't show a minimal example yet, but you might get the gist from the example below.
I have an actor that communicates over a serial line with an adapter. Upon send, it creates a continuation and stores it. Later, when the response has been completed, I try to resume the continuation, but nothing happens. Any idea what could be wrong?
//
// Cornucopia – (C) Dr. Lauer Information Technology
//
import Foundation
typealias Continuation = CheckedContinuation<String, Error>
public enum StreamError: Error {
case invalidEncoding
}
public class StreamCommand {
enum State {
case created
case transmitting
case transmitted
case responding
case completed
}
private var outputBuffer: [UInt8] = []
private var inputBuffer: [UInt8] = []
private var tempBuffer: [UInt8] = .init(repeating: 0, count: 8192)
var state: State = .created {
didSet {
print("state now \(state)")
}
}
let continuation: Continuation
let termination: [UInt8]
var canWrite: Bool { self.state == .created || self.state == .transmitting }
var canRead: Bool { self.state == .transmitted || self.state == .responding }
var isCompleted: Bool { self.state == .completed }
init(string: String, timeout: TimeInterval, termination: String, continuation: Continuation) {
self.outputBuffer = Array(string.utf8)
self.termination = Array(termination.utf8)
self.continuation = continuation
}
func write(to stream: OutputStream) {
precondition(self.canWrite)
self.state = .transmitting
let written = stream.write(&outputBuffer, maxLength: outputBuffer.count)
outputBuffer.removeFirst(written)
print("wrote \(written) bytes")
if outputBuffer.isEmpty {
self.state = .transmitted
}
}
func read(from stream: InputStream) {
precondition(self.canRead)
self.state = .responding
let read = stream.read(&self.tempBuffer, maxLength: self.tempBuffer.count)
print("read \(read) bytes")
self.inputBuffer += self.tempBuffer[0..<read]
guard let terminationRange = self.inputBuffer.lastRange(of: self.termination) else {
return
}
self.state = .completed
}
func resumeContinuation() {
guard let response = String(bytes: self.inputBuffer, encoding: .utf8) else {
self.continuation.resume(throwing: StreamError.invalidEncoding)
return
}
self.continuation.resume(returning: response)
}
}
public actor StreamCommandQueue: NSObject {
let input: InputStream
let output: OutputStream
var pendingCommands: [StreamCommand] = []
var activeCommand: StreamCommand?
let termination: String
init(input: InputStream, output: OutputStream, termination: String = "") {
self.input = input
self.output = output
self.termination = termination
super.init()
self.input.delegate = self
self.output.delegate = self
self.input.schedule(in: RunLoop.current, forMode: .default)
self.output.schedule(in: RunLoop.current, forMode: .default)
self.input.open()
}
func send(string: String, timeout: TimeInterval) async throws -> String {
let response: String = try await withCheckedThrowingContinuation { continuation in
self.activeCommand = StreamCommand(string: string, timeout: timeout, termination: self.termination, continuation: continuation)
self.outputActiveCommand()
}
return response
}
func outputActiveCommand() {
guard self.output.streamStatus == .open else { return self.output.open() }
guard self.output.hasSpaceAvailable else { return }
guard let command = self.activeCommand else { fatalError() }
guard command.canWrite else {
print("command sent, waiting for response...")
return
}
command.write(to: self.output)
}
func inputActiveCommand() {
guard self.input.streamStatus == .open else { return }
guard self.input.hasBytesAvailable else { return }
guard let command = self.activeCommand else { fatalError("received unsolicited bytes") }
guard command.canRead else {
print("command not ready for reading...")
return
}
command.read(from: self.input)
if command.isCompleted {
command.resumeContinuation()
}
}
}
extension StreamCommandQueue: StreamDelegate {
nonisolated public func stream(_ aStream: Stream, handle eventCode: Stream.Event) {
print("stream \(aStream), event \(eventCode)")
switch (aStream, eventCode) {
case (self.output, .hasSpaceAvailable):
async { await self.outputActiveCommand() }
case (self.input, .hasBytesAvailable):
async { await self.inputActiveCommand() }
default:
break
}
}
}
I briefly scanned continuation proposal and PartialAsyncTask.swift source and couldn't find any info on whether it's allowed to escape the continuation from the closure or not (like it was with Unsafe*Pointer). However, since there is some Builtin stuff going on there, my guess that it's not welcomed. Otherwise we could have API like EventLoopPromise from SwiftNIO.
mickeyl
(Dr. Mickey Lauer)
3
Ok, if this was the case it would severely limit the usefulness of bridging async and completion-handler-based worlds. Besides it silently failing (which would be sad in any case), I really hope it‘s just a bug.
1 Like
Escaping the continuation is the expected use pattern.
You're sure that you're actually resuming the continuation?
1 Like
mickeyl
(Dr. Mickey Lauer)
5
100%, the breakpoint gets hit, but even „stepIn“ goes right over to the next line.
Is there a way to help with better diagnostics? Po‘ing the continuation in lldb fails as well, btw.
Resuming a continuation is asynchronous, so it's expected that stepping in/over a resumption doesn't immediately cause something to happen. But the asynchronous function should start executing again.
1 Like
Jon_Shier
(Jon Shier)
7
This is pretty surprising to me. If this is the case, why the global with functions rather than a closure-based initializer, like Task has settled on? My assumption was always that the continuation shouldn't escape its providing closure, given the API's similarity to the various unsafe APIs already in Swift which bound unsafe access to the closure. Rereading the original continuations proposal, there's no mention of why the global functions were chosen over initializers. However, it does show an escaping use, which I didn't realize before.
mickeyl
(Dr. Mickey Lauer)
8
Ok, here is a minimal, self-contained, example that shows the problem:
//
// Cornucopia – (C) Dr. Lauer Information Technology
//
import Foundation
typealias Continuation = CheckedContinuation<String, Error>
public enum StreamError: Error {
case invalidEncoding
}
public class StreamCommand {
let continuation: Continuation
init(continuation: Continuation) {
self.continuation = continuation
}
func resumeContinuation() {
let response = "fooBar"
print("<triggering continuation resume> \(self.continuation)")
self.continuation.resume(returning: response)
print("</triggering continuation resume>")
}
}
public actor StreamCommandQueue: NSObject {
var activeCommand: StreamCommand?
func send(string: String, timeout: TimeInterval) async throws -> String {
print("awaiting...")
let response: String = try await withCheckedThrowingContinuation { continuation in
print("continuation: \(continuation)")
self.activeCommand = StreamCommand(continuation: continuation)
self.outputActiveCommand()
}
print("came back after awaiting")
return response
}
func outputActiveCommand() {
async {
self.activeCommand?.resumeContinuation()
}
}
func inputActiveCommand() {
}
}
func doIt() {
async {
let streamQueue = StreamCommandQueue()
do {
let identification = try await streamQueue.send(string: "ATI\r", timeout: 1)
print("identification: \(identification)")
} catch {
print("can't get identification: \(error)")
}
}
}
1 Like
mickeyl
(Dr. Mickey Lauer)
9
Further inspection shows that this seems to be an issue with the actor – changing the actor to a class makes the program work.
Opened an issue @ [SR-14841] Concurrency: Resuming a stored continuation from an actor does not work · Issue #57188 · apple/swift · GitHub
jmjauer
(Johannes Auer)
10
Hey, maybe that is related:
Swift
Known Issues
- The
async doc://com.apple.documentation/documentation/swift/task APIs in macOS, watchOS, and tvOS differ from the async Task APIs in iOS. As a result, Task API calls on Actor objects in multi-platform projects may cause async functions to hang at a suspension point. (79378627)
1 Like
mickeyl
(Dr. Mickey Lauer)
11
I don‘t have an iOS15 device to test… perhaps someone here could test drive this theory?
Jon_Shier
(Jon Shier)
12
You should be able to test it by running your code in another platform simulator, like tvOS, or on a Monterey instance.
Perhaps you're scheduling the stream on a runloop that is not running? The actor is running on its own serial queue.