which do you think is a better asynchronous architecture, feeding updates to an AsyncStream<Update>.Continuation (nonisolated because it is a let constant) and then consuming them sequentially on a dedicated task, or spawning new tasks to send each update to an actor-isolated method?
pattern 1:
actor Foo
{
enum Update
{
...
}
let inbox:AsyncStream<Update>.Continuation
private
let stream:AsyncStream<Update>
init()
{
var escapee:AsyncStream<Action>.Continuation? = nil
self.stream = .init
{
escapee = $0
}
guard let inbox:AsyncStream<Action>.Continuation = escapee
else
{
unreachable
}
self.inbox = inbox
}
// called from a dedicated task, runs indefinitely
func run() async
{
for await update:Update in self.stream
{
...
}
}
}
I've been thinking about this for a while, and I'm increasingly confident that using a stream is the way to go.
The big problem with the task-per-update is that all the tasks you create are unstructured. They just...pile up, uncancelable, and effectively unordered. They force you to write manual cancellation functions to drop your resources in order to stop getting notified of changes. They don't allow you to easily and effectively cancel an entire operation by using the regular structured concurrency primitives.
I see only one advantage of task-per-update: it's really easy to write. That makes them tempting, but probably not the right choice for a design.
Ha ha! This is very interesting. I'm still spoiled by DispatchQueue and its free ordering guarantees (FIFO), and I was wondering how the new async apis could be made to provide any. This thread has turned on the light I was missing Thank you to you both!
EDIT: ordering should be reliable as long as the stream yields its elements from a single concurrent context, according to the documentation:
The AsyncStream.Continuation [...] is appropriate for use in concurrent contexts. It is thread safe to send and finish; all calls to the continuation are serialized. However, calling this from multiple concurrent contexts could result in out-of-order delivery.
// Elements are guaranteed to be processed in the same
// order as the stream. As one element is processed,
// other elements are buffered according to the buffering
// policy of the stream:
for await element in stream {
await process(element)
}
When reading the above quoted documentation, I understand that no element is ever dropped. Yielding from multiple concurrent contexts can affect ordering, but this will never result in data loss. Is this correct?