Async design patterns: stream/continuation vs task-per-update?

which do you think is a better asynchronous architecture, feeding updates to an AsyncStream<Update>.Continuation (nonisolated because it is a let constant) and then consuming them sequentially on a dedicated task, or spawning new tasks to send each update to an actor-isolated method?

pattern 1:

actor Foo
{
    enum Update 
    {
        ...
    }
    
    let inbox:AsyncStream<Update>.Continuation 
    private 
    let stream:AsyncStream<Update>
    
    init() 
    {
        var escapee:AsyncStream<Action>.Continuation?       = nil 
        self.stream = .init 
        {
            escapee = $0
        }
        guard let inbox:AsyncStream<Action>.Continuation    = escapee 
        else 
        {
            unreachable
        }
        self.inbox      = inbox
    }
    // called from a dedicated task, runs indefinitely
    func run() async 
    {
        for await update:Update in self.stream 
        {
            ...
        }
    }
}

pattern 2:

actor Foo
{
    enum Update 
    {
        ...
    }
   
    init() 
    {
    }

    nonisolated 
    func update(with update:Update) 
    {
        Task 
        {
            await self._update(with: update)
        }
    }
    private 
    func _update(with update:Update) 
    {
        ...
    }
}

I've been thinking about this for a while, and I'm increasingly confident that using a stream is the way to go.

The big problem with the task-per-update is that all the tasks you create are unstructured. They just...pile up, uncancelable, and effectively unordered. They force you to write manual cancellation functions to drop your resources in order to stop getting notified of changes. They don't allow you to easily and effectively cancel an entire operation by using the regular structured concurrency primitives.

I see only one advantage of task-per-update: it's really easy to write. That makes them tempting, but probably not the right choice for a design.

2 Likes

Ha ha! This is very interesting. I'm still spoiled by DispatchQueue and its free ordering guarantees (FIFO), and I was wondering how the new async apis could be made to provide any. This thread has turned on the light I was missing :bulb: Thank you to you both!

EDIT: ordering should be reliable as long as the stream yields its elements from a single concurrent context, according to the documentation:

The AsyncStream.Continuation [...] is appropriate for use in concurrent contexts. It is thread safe to send and finish; all calls to the continuation are serialized. However, calling this from multiple concurrent contexts could result in out-of-order delivery.

// Elements are guaranteed to be processed in the same
// order as the stream. As one element is processed,
// other elements are buffered according to the buffering
// policy of the stream:
for await element in stream {
    await process(element)
}

When reading the above quoted documentation, I understand that no element is ever dropped. Yielding from multiple concurrent contexts can affect ordering, but this will never result in data loss. Is this correct?

2 Likes

That's definitely my understanding, yes.

1 Like
Terms of Service

Privacy Policy

Cookie Policy