OK, so maybe I have something useful here. It extends the idea of an AsyncSerialResource into something which allows responses to be sent back to the caller serially, all using structured concurrency. Using trendy parlance, it creates a unidirectional data-flow between caller and callee.
I borrowed the name Reactor as it seems to fit better. Here's the outline:
public final class AsyncReactor<Context, Output: Sendable>: AsyncSequence {
public init(_ context: Context, outputType output: Output.Type)
public func perform( _ action: @Sendable @escaping (Context, @Sendable @escaping (Output) -> Void) async -> Void)
}
And, it is itself an AsyncSequence which allows two-way communication between caller and callee while maintaining a linear communication contract.
Here's a contrived example of how you might use it:
final class SomeObservable: ObservableObject {
enum State: Sendable { case loading, loaded(Image), failure(Error?) }
@Published private (set) var state = State.loading
private let loader = AsyncReactor(ImageLoader(), outputType: State.self)
init() {
// All responses from the Reactor are received and processed serially as there's just one long
// running task processing the output, and one long running task inside the Reactor processing
// the input that is throttled by the throughput of the output processing task. This creates a
// uni-directional flow of data.
Task {
for await reaction in loader {
// I think we can even do some async thing here which would suspend execution in the
// Reactor until we complete this cycle of the loop?
await someAsyncThing()
self.state = reaction
}
}
}
func someAsyncThing() async {}
func nextSerialImage() {
// calls to the Reactor are guaranteed to be called in serial order and without interlaving
// between suspension points
loader.perform { loader, send in
// That means we can update the caller as we make forward progress and it should work
// as you'd expect.
await send(.loading)
do {
let image = try await loader.getNextImageOverNetwork()
await send(.loaded(image))
}
catch let error {
await send(.failure(error))
}
}
}
}
// I'm not sure Image will end up being Sendable, but let's pretend for the sake of example
extension Image: @unchecked Sendable {}
The 'context' here holds one resource, but you could wrap multiple resources in a type (database, image cache, etc.) and pass that as the context, it would work as expected.
Implementation:
// For multicast support, we lean on the AsyncChannel type from Apple's open
// source AsyncAlgorithms package.
import AsyncAlgorithms
public final class AsyncReactor<Context, Output: Sendable> {
private let context: Context
private let sendInput: AsyncStream<() async -> Void>.Continuation
private let outputChannel = AsyncChannel(element: Output.self)
public init(_ context: Context, outputType output: Output.Type) {
self.context = context
let (inputStream, input)
= AsyncStream<() async -> Void>.withExtractedContinuation((() async -> Void).self)
self.sendInput = input
Task {
for await action in inputStream {
await action()
}
}
}
public func perform(
_ action: @Sendable @escaping (Context, @Sendable @escaping (Output) async -> Void) async -> Void
) {
sendInput.yield { [context, outputChannel] in
await action(context) { output in
await outputChannel.send(output)
}
}
}
}
extension AsyncReactor: AsyncSequence {
public typealias AsyncIterator = AsyncChannel<Output>.Iterator
public typealias Element = Output
public func makeAsyncIterator() -> AsyncChannel<Output>.Iterator {
outputChannel.makeAsyncIterator()
}
}
fileprivate extension AsyncStream {
static func withExtractedContinuation<T: Sendable>(
_ type: T.Type
) -> (AsyncStream<T>, AsyncStream<T>.Continuation) {
var extractedContinuation: AsyncStream<T>.Continuation! = nil
let stream = AsyncStream<T>(T.self, bufferingPolicy: .unbounded) { continuation in
extractedContinuation = continuation
}
return (stream, extractedContinuation)
}
}
What's great is that with the multicast support from AsyncChannel, it should be viable to share the Reactor around without issues. When you're in a 'perform' block, you have exclusive access to the reactor context for the duration that the call is executing.
You could, for example, wrap it in an ObservableObject, and share it with multiple SwiftUI views all receiving updates and being able to perform operations serially without issue.