AsyncStream Element delivery

Good morning!

I've been going through my projects code base trying to adapt some of its callback/delegation-based API's to Swift Concurrency world using AsyncStreams.

Looking at the documentation it seemed like the perfect choice AsyncStream | Apple Developer Documentation

especially the first paragraph where it states that:

Blockquote
In particular, an asynchronous stream is well-suited to adapt callback- or delegation-based APIs to participate with async -await .

But while investigating this further I noticed that AsyncStream doesn't really work the same way that a callback or delegate would - on a fundamental level.

Having a hypothetical method in Foo class and a Bar class as the delegate or callback consumer

func foo() {
 ...
delegate.bar(value) or callback(value)
 ...
}

Both delegate and callback will notify the Bar class synchronously. So the Foo class will give the control over to delegate that will then call the bar method. Once the work there is done, the Foo class will regain control and continue with whatever work there is to be done there.

I'm trying to understand how AsyncStream fits into situations like the one described above when there is some additional work to be done on the calling side. Maybe I need to update some model data in Bar class - refresh views, reload data etc. before I pop/dismiss the top Foo view.

It seems like this order of operations is no longer possible using AsyncStreams since elements produced using yield are not delivered immediately

func foo() {
 ...
continuation.yield(value) <--- locally stored continuation, registers value
 ...
}

foo return

<---- Bar consumes the value via stream

It seems to me that the quote from apple docs, the one I posted at the top, is false because those concepts are fundamentally different. They work differently. With callback/delegate I have full control over how and when it'll be called and delivered whereas the AsyncStream is fire-and-forget type of thing.

I'm not sure how those could be equivalent? It's possible that I'm missing something. This new Concurrency model is still a bit of a mystery to me so maybe someone could explain this to me?

It feels like for those concepts to be somewhat similar yield() should be suspending, switching to task related to stream observation, producing an element for the consumer and giving back control to the caller when element is consumed.

Maybe there is a way to actually achieve that?

The doc isn't suggesting you to replace all your delegate/callback APIs with the stream. While it is a good tool for the job occasionally (see StoreKit 2 APIs as an example), the reference you mention is actually about different thing. You can see in the doc example with QuakeMonitor that is callback-based API, and how it is adapted using AsyncStream. As more closer to Apple platforms example it might be NWPathListener (hope I correctly recall the name), which has same callback-based API. You also can have delegate-based APIs that are called from the background threads as well. In those cases you have already asynchronous events, so wrapping it in the AsyncStream just allows to bridge those APIs to Swift Concurrency.

You're right, it isn't. It does imply however that those concepts are interchangeable without highlighting the fundamental differences in the way they work and that you need to be very careful where you actually use AsyncStream. After reading the docs I was left with the impression that, in fact, I can replace all of delegate/callback APIs when in reality it's like you said -

it is a good tool for the job occasionally

I think the confusion is created by the word adopt here. However, given that there is the Adapter pattern in programming which has the same use case, that seems to be a reasonable chose of words to me, but might be not that obvious in the first place.

That's correct. Unfortunately, if you need a synchronous callback or delegate like observation, asynchronous sequences are the wrong tool.

Currently, apart from callbacks, the only built-in synchronous observation mechanism we have is Observable. However, its API is quite thin and therefore the use cases it enables are quite limited.

The AsyncStream implementation does not make this easy (because yield is fire-and-forget).

You might want to look at AsyncChannel if you need "backpressure" for your use-case.

There was also discussion about adding support for backpressure to AsyncStream (SE-0406)… but it looks like that is on hold for now.

You also have the option of trying to build your own AsyncSequence implementation with custom control over how elements are dispatched and what kind of "blocking" happens. You might use something like a continuation to suspend and wait for the listener-subscriber to consume the element that was just published. The AsyncStream and SE-0406 implementations might help give you some ideas if you want to go with a custom approach. Good luck!

Thank you for all the comments and suggestions, really appreciate it!