Multiple AsyncSequences from one?

Let’s say we have AsyncStream<Result<Value, Error>>. Is it possible to split it into two streams: one for values, another for errors?

I tried doing two separate compactMap sequences, filtering the .success() and the .failure() but they work only if there’s only one of them.

Example (even/odd instead of Result for simplicity):

let integers = AsyncStream<Int> { continuation in
    Task {
        var size = 10
        while !Task.isCancelled && size > 0 {
            try? await Task.sleep(for: .milliseconds(300))
            continuation.yield(size)
            size -= 1
        }
        continuation.finish()
    }
}

let evens = integers.compactMap { i in
    i % 2 == 0 ? i : nil
}
let odds = integers.compactMap { i in
    i % 2 == 0 ? nil : i
}
for await e in evens {
    print("even:", e)
}
for await o in odds {
    print("odd:", o)
}
print("Print over.")
Output
even: 10
even: 8
even: 6
even: 4
even: 2
Print over.

This means that AsyncSequence is consumed. Which makes it very different from Combine publishers. Usually people here suggest to switch from Combine to AsyncStream, not paying attention to this fact!

1 Like

Here's another example which is beyond my understanding:

let integers = ... // Same as above
let evens = integers.compactMap { i in
    i % 2 == 0 ? i : nil
}
let odds = integers.compactMap { i in
    i % 2 == 0 ? nil : i
}
await withTaskGroup { group in
    group.addTask {
        for await o in odds {
            print("odd:", o)
        }
        print("Odds done.")
    }
    group.addTask {
        for await e in evens {
            print("even:", e)
        }
        print("Evens done.")
    }
    await group.waitForAll()
    print("Group done.")
}
Output
Odds done.
Evens done.
Group done.

For my use case I made this restricted clone of Publisher's share() based on escaping continuations. Since documentation explicitely mentions, that the Continuation supports escaping, I believe it's valid. So far it works. Feel free to use and contribute.

extension AsyncStream where Element: Sendable {
    func double() -> (AsyncStream<Element>, AsyncStream<Element>) {
        var continuation1: AsyncStream<Element>.Continuation!
        var continuation2: AsyncStream<Element>.Continuation!
        let stream1 = AsyncStream { continuation in
            continuation1 = continuation
        }
        let stream2 = AsyncStream { continuation in
            continuation2 = continuation
        }
        Task {
            for await element in self {
                continuation1.yield(element)
                continuation2.yield(element)
            }
            continuation1.finish()
            continuation2.finish()
        }
        return (stream1, stream2)
    }
}


let integers = ... // as above

let (integers1, integers2) = integers.double()

let evens = integers1.compactMap { i in
    i % 2 == 0 ? i : nil
}
let odds = integers2.compactMap { i in
    i % 2 == 0 ? nil : i
}
for await e in evens {
    print("even:", e)
}
for await o in odds {
    print("odd:", o)
}
print("Print over.")
Output
even: 10
even: 8
even: 6
even: 4
even: 2
odd: 9
odd: 7
odd: 5
odd: 3
odd: 1
Print over.
2 Likes

Check out swift-async-algorithms/Sources/AsyncAlgorithms/AsyncShareSequence.swift at main · apple/swift-async-algorithms · GitHub (not included in the latest release)

More on this solution here: Kickoff of a new season of development for AsyncAlgorithms; Share

1 Like

Thanks for the sharing, I actually was looking at async algoritms, but didn't found this one in Readme.

Share is the correct tool for the job here:

import AsyncAlgorithms

let integers = AsyncStream<Int> { continuation in
    Task {
        var size = 10
        while !Task.isCancelled && size > 0 {
            try? await Task.sleep(for: .milliseconds(300))
            continuation.yield(size)
            size -= 1
        }
        continuation.finish()
    }
}.share() // this permits the AsyncSequence to be shared across 
          // multiple "sides" doing different things. 
          // By default it will make sure that all sides consume
          // at a similar rate by awaiting the other sides to be 
          // fulfilled for their calls to next.

let evens = integers.compactMap { i in
    i % 2 == 0 ? i : nil
}
let odds = integers.compactMap { i in
    i % 2 == 0 ? nil : i
}
3 Likes

@jamieQ Could this be a possible bug? I found sometimes it would print all the numbers while most often it did not. Furthermore explicitly specifying childTaskResultType lead to a higher occurrence where all the numbers would be printed. Did only do a quick test but found no reliable pattern, what do you think?

i think this is just a confusing consequence of sharing the same underlying AsyncStream across multiple 'consumers'. the problem with doing this without explicitly replicating the stream's elements per 'consumer' is that delivery order can be arbitrary depending on the configuration (see my confused reaction upon initially learning about this behavior).

so what i think is going on in this case is that there's the one 'source' stream:

let integers = AsyncStream<Int> { continuation in
    Task {
        var size = 10
        while !Task.isCancelled && size > 0 {
            try? await Task.sleep(for: .milliseconds(300))
            continuation.yield(size)
            size -= 1
        }
        continuation.finish()
    }
}

and then the code that transforms the source sequence:

let evens = integers.compactMap { i in
    i % 2 == 0 ? i : nil
}
let odds = integers.compactMap { i in
    i % 2 == 0 ? nil : i
}

and lastly, there's the code that actually performs the iteration:

await withTaskGroup { group in
    group.addTask {
        for await o in odds {
            print("odd:", o)
        }
        print("Odds done.")
    }
    group.addTask {
        for await e in evens {
            print("even:", e)
        }
        print("Evens done.")
    }
    await group.waitForAll()
    print("Group done.")
}

since the iterating child tasks are concurrent with respect to each other, they will race for the initial consumption. if the odd map sequence 'wins', it will get an even value first, effectively seeing nil. then, due to implementation details of how AsyncStream manages multiple consumers (i think they are FIFO, but it's been a while since i looked closely), the next value will be delivered to the even map sequence. but of course that value will be odd, so that too will return nil. so the sequencing happens to align such that both map sequences effectively filter everything out. of course if the race goes the other way then it 'works'.

if you serialize the child tasks by putting them on the same global actor (not something you'd actually want to do with that API in general), and ensure that even iterator is the first one enqueued:

await withTaskGroup(of: Void.self) { group in
    group.addTask { @Sendable @MainActor in
        for await e in evens {
            print("even:", e)
        }
        print("Evens done.")
    }
    group.addTask { @Sendable @MainActor in
        for await o in odds {
            print("odd:", o)
        }
        print("Odds done.")
    }
    await group.waitForAll()
    print("Group done.")
}

then i think the output will print the expected, consistent results.

output
even: 10
odd: 9
even: 8
odd: 7
even: 6
odd: 5
even: 4
odd: 3
even: 2
Evens done.
odd: 1
Odds done.
Group done.
1 Like

(post deleted by author)

Thanks for the thorough replay, this indeed had me confused as well. Looking over the linked thread and the related GitHub issue, it would be perhaps worth considering finally banning or at least emitting a warning when such concurrent iterations occurs.

P.S Interesting to know that there is behavioral difference between the two versions of AsyncStream, do you know if this is an exception or if there are other constructs with differing behavior? I haven’t really looked into this yet.