Merging an array of async Streams into a single stream

I've just enabled Swift6 for my Package, and now I'm trying to catch up on all the compiler errors I get due to the new concurrency checking.

One issue that I'm struggling with is merging an array of Async Streams into a single Async Stream. For that, I'm using AsyncAlgorithms, and I don't know how to construct a clean AsyncStream of the intermediate result (which is a generic AsyncSequence)... whatever I do, I get an error that I'm passing a closure into a Task, which is not allowed.

This is my code:

import Foundation
import AsyncAlgorithms

extension Collection where Element == AsyncStream<MyType> {
    /// Merges a collection of streams into a single stream.
    func merge() -> AsyncStream<MyType> {
        guard let first = self.first else {
            return .init { continuation in continuation.finish() }
        }

        return self[index(after: startIndex)...].reduce(first) { partialResult, stream in
            AsyncAlgorithms.merge(partialResult, stream).toAsyncStream()
        }
    }
}

/// Converts a async sequence to a async stream.
///
/// > Important: This is fileprivate because in general a async sequence can throw and that case is not handled here.
fileprivate extension AsyncSequence where Element: Sendable {
    func toAsyncStream() -> AsyncStream<Element> {
        .init { continuation in
            let childTask = Task { // ERROR: Passing closure as a 'sending' parameter risks causing data races between code in the current task and concurrent execution of the closure
                for try await element in self {
                    continuation.yield(element)
                }
                continuation.finish()
            }

            continuation.onTermination = { termination in
                switch termination {
                case .finished:
                    break
                case .cancelled:
                    childTask.cancel()
                @unknown default:
                    break
                }
            }
        }
    }
}

Can somebody please explain how I can merge all my streams together correctly?

Edit: Also when using other approaches like a task group, the error persists; I cannot pass the continuation, which I must use to initialize a new stream, to the corresponding child tasks.

// Function to merge multiple async sequences into one
    func mergeAsyncSequences<T: AsyncSequence>(_ sequences: [T]) -> AsyncStream<T.Element> where T.Element: Sendable {
        return AsyncStream { continuation in
            // Track the number of active tasks to finish when all are completed
            let taskGroup = Task { // ERROR: Passing closure as a 'sending' parameter risks causing data races between code in the current task and concurrent execution of the closure
                await withThrowingTaskGroup(of: Void.self) { group in
                    for sequence in sequences {
                        group.addTask { // ERROR: Passing closure as a 'sending' parameter risks causing data races between code in the current task and concurrent execution of the closure
                            for try await element in sequence {
                                continuation.yield(element)
                            }
                        }
                    }
                }
                // Close the continuation when all tasks complete
                continuation.finish()
            }
        }
    }
1 Like

i believe the concurrency errors in both these cases are due to the fact that the types conforming to AsyncSequence are not known to be Sendable, nor are the relevant parameters sending (i.e. in a 'disconnected isolation region')[1]. there are a couple options i can think of to address this issue:

option 1: add Sendable requirements

the async stream conversion utility function is currently declared as:

fileprivate extension AsyncSequence where Element: Sendable {
    func toAsyncStream() -> AsyncStream<Element> { ... }
}

the errors at the Task closure are due to the fact that self is passed in, but self isn't known to the compiler as being Sendable (it is not constrained as such in the extension) so its use in the Task's closure is a problem.

if the extension constrains the sequence to be Sendable, the errors are resolved:

fileprivate extension AsyncSequence where Self: Sendable, Element: Sendable { ... }

option 2: extend the concrete type AsyncMerge2Sequence

in this instance, the type of the merged sequences are known to be AsyncMerge2Sequence, which is Sendable under certain conditions (in this case, as long as MyType is also Sendable, which i've assumed is the case). since the concrete type is known, the utility for converting to an AsyncStream could instead be declared in an extension on that particular type, which would then propagate the sendability. e.g.

fileprivate extension AsyncMerge2Sequence {
    func toAsyncStream() -> AsyncStream<Element> { ... }

this approach is a bit less general, but may be sufficient in this case.


  1. see SE-414 and SE-430 for more info & terminology ↩ī¸Ž

2 Likes