Hi all, I am using an AsyncStream to propagate updates from a network download.
Inside the stream, I have an async function which causes me to have to wrap the entire body in a Task, otherwise I get an error. E.g.
AsyncStream<Asset<IdentifiableType>> { continuation in
Task {
let someAsyncWorkResult = await someAsyncFunction()
}
}
My main question is, it ok to have an async unit of work inside an async stream? (I haven't observed any issues yet) Is there a nicer way of handling the above?
Yes, but I don't see the AsyncThrowingStream having the async function init. (Sorry, I'm still quite new to async/await)
Also, I wouldn't want to terminate the stream on a throw. (For context, I'm using it to bridge a delegate callback to async world, similar to Apple's example with LocationManager and quake monitor.
I think your best bet here is to not throw at all then and use a normal AsyncStream - if the underlying call you’re using in that closure called by the stream throws, you can handle it with a do/catch block to make it not throw when the stream calls it. By definition, throwing an error means something unrecoverable has occurred in that call; if you want some other behavior, you’ll have to catch the error and handle it yourself by (for instance) retrying the underlying call and not letting the error propagate.
As a concrete example, you could do something like:
AsyncStream<MyObject> {
while true {
do {
return try await doSomeAsyncWork()
}
catch {}
}
}
But now it’s wholly up to you to handle what happens if doSomeAsyncWork just throws on every call (you likely want some limit as to how many times it retries before giving up).
But the point is that you can’t throw an error and have the stream retry since an error is a signal used to tell the stream to terminate.
Using unfolding is nowhere near as versatile as the continuation method.
Also, if we want to iterate through a loop calling sleep, for instance in a count down, it gets really messy.
It would be really nice if we could do something like this :
let stream = AsyncStream<Double> { continuation in
for i in stride(from: 0, to: 10, by: 0.5) {
continuation.yield(Double(i))
await Task.sleep(seconds: 1)
}
}
Instead, I end up with this abomination :
var i = 20
let streamUnfolding = AsyncStream<Double>(unfolding: {
switch i {
case 0:
return nil
default:
i = i - 1
return Double(i)
}
})
And the unfolding case references a shared state, so if I try to execute this twice, it game over red rover...
The best solution I found for this is :
let stream = AsyncStream<Double> { continuation in
Task {
for i in stride(from: 0, to: 10, by: 0.5) {
continuation.yield(Double(i))
await Task.sleep(seconds: 1)
}
}
}
I think most of the real-world use cases that need something like this won't be nearly as simple, with the values being produced in an async nature from the beginning and thus sidestepping a lot of those problems.
Still, for this exact usage here's an off-the-top-of-my-head solution that's a little better, and probably captures your intent of "produce values synchronously, with an additional layer of asynchronous delay" a little better:
let stream = AsyncStream<Double> { continuation in
for i in stride(from: 0, to: 10, by: 0.5) {
continuation.yield(Double(i))
}
continuation.finish()
}.map { element -> Double in
try await Task.sleep(nanoseconds: 1_000_000_000)
return element
}