I've just enabled Swift6 for my Package, and now I'm trying to catch up on all the compiler errors I get due to the new concurrency checking.
One issue that I'm struggling with is merging an array of Async Streams into a single Async Stream. For that, I'm using AsyncAlgorithms, and I don't know how to construct a clean AsyncStream of the intermediate result (which is a generic AsyncSequence)... whatever I do, I get an error that I'm passing a closure into a Task, which is not allowed.
This is my code:
import Foundation
import AsyncAlgorithms
extension Collection where Element == AsyncStream<MyType> {
/// Merges a collection of streams into a single stream.
func merge() -> AsyncStream<MyType> {
guard let first = self.first else {
return .init { continuation in continuation.finish() }
}
return self[index(after: startIndex)...].reduce(first) { partialResult, stream in
AsyncAlgorithms.merge(partialResult, stream).toAsyncStream()
}
}
}
/// Converts a async sequence to a async stream.
///
/// > Important: This is fileprivate because in general a async sequence can throw and that case is not handled here.
fileprivate extension AsyncSequence where Element: Sendable {
func toAsyncStream() -> AsyncStream<Element> {
.init { continuation in
let childTask = Task { // ERROR: Passing closure as a 'sending' parameter risks causing data races between code in the current task and concurrent execution of the closure
for try await element in self {
continuation.yield(element)
}
continuation.finish()
}
continuation.onTermination = { termination in
switch termination {
case .finished:
break
case .cancelled:
childTask.cancel()
@unknown default:
break
}
}
}
}
}
Can somebody please explain how I can merge all my streams together correctly?
Edit: Also when using other approaches like a task group, the error persists; I cannot pass the continuation, which I must use to initialize a new stream, to the corresponding child tasks.
// Function to merge multiple async sequences into one
func mergeAsyncSequences<T: AsyncSequence>(_ sequences: [T]) -> AsyncStream<T.Element> where T.Element: Sendable {
return AsyncStream { continuation in
// Track the number of active tasks to finish when all are completed
let taskGroup = Task { // ERROR: Passing closure as a 'sending' parameter risks causing data races between code in the current task and concurrent execution of the closure
await withThrowingTaskGroup(of: Void.self) { group in
for sequence in sequences {
group.addTask { // ERROR: Passing closure as a 'sending' parameter risks causing data races between code in the current task and concurrent execution of the closure
for try await element in sequence {
continuation.yield(element)
}
}
}
}
// Close the continuation when all tasks complete
continuation.finish()
}
}
}