Bear with me, I can't show a minimal example yet, but you might get the gist from the example below.
I have an actor that communicates over a serial line with an adapter. Upon send
, it creates a continuation and stores it. Later, when the response has been completed, I try to resume the continuation, but nothing happens. Any idea what could be wrong?
//
// Cornucopia – (C) Dr. Lauer Information Technology
//
import Foundation
typealias Continuation = CheckedContinuation<String, Error>
public enum StreamError: Error {
case invalidEncoding
}
public class StreamCommand {
enum State {
case created
case transmitting
case transmitted
case responding
case completed
}
private var outputBuffer: [UInt8] = []
private var inputBuffer: [UInt8] = []
private var tempBuffer: [UInt8] = .init(repeating: 0, count: 8192)
var state: State = .created {
didSet {
print("state now \(state)")
}
}
let continuation: Continuation
let termination: [UInt8]
var canWrite: Bool { self.state == .created || self.state == .transmitting }
var canRead: Bool { self.state == .transmitted || self.state == .responding }
var isCompleted: Bool { self.state == .completed }
init(string: String, timeout: TimeInterval, termination: String, continuation: Continuation) {
self.outputBuffer = Array(string.utf8)
self.termination = Array(termination.utf8)
self.continuation = continuation
}
func write(to stream: OutputStream) {
precondition(self.canWrite)
self.state = .transmitting
let written = stream.write(&outputBuffer, maxLength: outputBuffer.count)
outputBuffer.removeFirst(written)
print("wrote \(written) bytes")
if outputBuffer.isEmpty {
self.state = .transmitted
}
}
func read(from stream: InputStream) {
precondition(self.canRead)
self.state = .responding
let read = stream.read(&self.tempBuffer, maxLength: self.tempBuffer.count)
print("read \(read) bytes")
self.inputBuffer += self.tempBuffer[0..<read]
guard let terminationRange = self.inputBuffer.lastRange(of: self.termination) else {
return
}
self.state = .completed
}
func resumeContinuation() {
guard let response = String(bytes: self.inputBuffer, encoding: .utf8) else {
self.continuation.resume(throwing: StreamError.invalidEncoding)
return
}
self.continuation.resume(returning: response)
}
}
public actor StreamCommandQueue: NSObject {
let input: InputStream
let output: OutputStream
var pendingCommands: [StreamCommand] = []
var activeCommand: StreamCommand?
let termination: String
init(input: InputStream, output: OutputStream, termination: String = "") {
self.input = input
self.output = output
self.termination = termination
super.init()
self.input.delegate = self
self.output.delegate = self
self.input.schedule(in: RunLoop.current, forMode: .default)
self.output.schedule(in: RunLoop.current, forMode: .default)
self.input.open()
}
func send(string: String, timeout: TimeInterval) async throws -> String {
let response: String = try await withCheckedThrowingContinuation { continuation in
self.activeCommand = StreamCommand(string: string, timeout: timeout, termination: self.termination, continuation: continuation)
self.outputActiveCommand()
}
return response
}
func outputActiveCommand() {
guard self.output.streamStatus == .open else { return self.output.open() }
guard self.output.hasSpaceAvailable else { return }
guard let command = self.activeCommand else { fatalError() }
guard command.canWrite else {
print("command sent, waiting for response...")
return
}
command.write(to: self.output)
}
func inputActiveCommand() {
guard self.input.streamStatus == .open else { return }
guard self.input.hasBytesAvailable else { return }
guard let command = self.activeCommand else { fatalError("received unsolicited bytes") }
guard command.canRead else {
print("command not ready for reading...")
return
}
command.read(from: self.input)
if command.isCompleted {
command.resumeContinuation()
}
}
}
extension StreamCommandQueue: StreamDelegate {
nonisolated public func stream(_ aStream: Stream, handle eventCode: Stream.Event) {
print("stream \(aStream), event \(eventCode)")
switch (aStream, eventCode) {
case (self.output, .hasSpaceAvailable):
async { await self.outputActiveCommand() }
case (self.input, .hasBytesAvailable):
async { await self.inputActiveCommand() }
default:
break
}
}
}