Async function inside async stream

Hi all, I am using an AsyncStream to propagate updates from a network download.

Inside the stream, I have an async function which causes me to have to wrap the entire body in a Task, otherwise I get an error. E.g.

AsyncStream<Asset<IdentifiableType>> { continuation in
   Task {         
        let someAsyncWorkResult = await someAsyncFunction()
   }
 }

My main question is, it ok to have an async unit of work inside an async stream? (I haven't observed any issues yet) Is there a nicer way of handling the above?

1 Like

Don't sure :man_shrugging: or nothing I noticed yet
:thinking:

You're using the specific continuation-based initializer designed to provide a bridge between synchronous and asynchronous code - have you tried the dedicated initializer that uses an async function instead?

1 Like

Thank you so much @MPLewis that helps a lot. May I ask is it possible to use a throwing function inside?

AsyncStream<MyObject>.init {
            guard let somethingImportant = try await doSomeAsyncWork() else {
              throw MyCustomError.importantThingFailed
            }
             …
        } onCancel: {
        
 }

I get an error:

Invalid conversion from throwing function of type '() async throws -> MyObject’ to non-throwing function type '() async -> MyObject`

You can use an AsyncThrowingStream for this.

Yes, but I don't see the AsyncThrowingStream having the async function init. (Sorry, I'm still quite new to async/await)

Also, I wouldn't want to terminate the stream on a throw. (For context, I'm using it to bridge a delegate callback to async world, similar to Apple's example with LocationManager and quake monitor.

It’s down at the bottom of that page I linked: init(unfolding:) | Apple Developer Documentation

I think your best bet here is to not throw at all then and use a normal AsyncStream - if the underlying call you’re using in that closure called by the stream throws, you can handle it with a do/catch block to make it not throw when the stream calls it. By definition, throwing an error means something unrecoverable has occurred in that call; if you want some other behavior, you’ll have to catch the error and handle it yourself by (for instance) retrying the underlying call and not letting the error propagate.

As a concrete example, you could do something like:

AsyncStream<MyObject> {
    while true {
        do {
            return try await doSomeAsyncWork()
        }
        catch {}
    }
}

But now it’s wholly up to you to handle what happens if doSomeAsyncWork just throws on every call (you likely want some limit as to how many times it retries before giving up).

But the point is that you can’t throw an error and have the stream retry since an error is a signal used to tell the stream to terminate.

1 Like

Thank you very much, I appreciate the help. I'll try both approaches in a playground. :pray:

1 Like

Using unfolding is nowhere near as versatile as the continuation method.

Also, if we want to iterate through a loop calling sleep, for instance in a count down, it gets really messy.

It would be really nice if we could do something like this :

let stream = AsyncStream<Double> { continuation in
            for i in stride(from: 0, to: 10, by: 0.5) {
                continuation.yield(Double(i))
                await Task.sleep(seconds: 1)
            }
        }

Instead, I end up with this abomination :

      var i = 20
        let streamUnfolding = AsyncStream<Double>(unfolding: {
            switch i {
            case 0:
                return nil
            default:
                i = i - 1
                return Double(i)
            }
        })

And the unfolding case references a shared state, so if I try to execute this twice, it game over red rover...

The best solution I found for this is :

        let stream = AsyncStream<Double> { continuation in
            Task {
                for i in stride(from: 0, to: 10, by: 0.5) {
                    continuation.yield(Double(i))
                    await Task.sleep(seconds: 1)
                }
            }
        }

I think most of the real-world use cases that need something like this won't be nearly as simple, with the values being produced in an async nature from the beginning and thus sidestepping a lot of those problems.

Still, for this exact usage here's an off-the-top-of-my-head solution that's a little better, and probably captures your intent of "produce values synchronously, with an additional layer of asynchronous delay" a little better:

let stream = AsyncStream<Double> { continuation in
	for i in stride(from: 0, to: 10, by: 0.5) {
		continuation.yield(Double(i))
	}
	continuation.finish()
}.map { element -> Double in
	try await Task.sleep(nanoseconds: 1_000_000_000)
	return element
}

I think the idea is to break up complex tasks into small ones. We should end up with a number of very simple streams/activities that don't get messy.

Nice, thanks for reminding me about the transformation operations we can execute on the sequence!