Hi,
Is there an interest in that repo to extend operators that involve cardinally to their variadic parameters counterpart (merge, zip, combineLatest) ?
It would involve being able to type erase AsyncSequence at some point. For merge
, a possible implementation would be something like:
public func mergeMany<Base: AsyncSequence>(_ bases: Base...) -> AnyAsyncSequence<Base.Element>
where
Base: Sendable,
Base.Element: Sendable,
Base.AsyncIterator: Sendable {
guard let seed = bases.first?.eraseToAnyAsyncSequence() else { return AsyncEmptySequence<Base.Element>().eraseToAnyAsyncSequence() }
return bases[1..<bases.count].reduce(into: seed) { accumulator, base in
accumulator = merge(accumulator, base).eraseToAnyAsyncSequence()
}
}
where:
public struct AsyncEmptySequence<Element>: AsyncSequence {
public typealias Element = Element
public typealias AsyncIterator = Iterator
public init() {}
public func makeAsyncIterator() -> AsyncIterator {
Iterator()
}
public struct Iterator: AsyncIteratorProtocol {
public func next() async -> Element? {
nil
}
}
}
and
public extension AsyncSequence {
/// Type erase the AsyncSequence into an AnyAsyncSequence.
/// - Returns: A type erased AsyncSequence.
func eraseToAnyAsyncSequence() -> AnyAsyncSequence<Element> {
AnyAsyncSequence(self)
}
}
/// Type erased version of an AsyncSequence.
public struct AnyAsyncSequence<Element>: AsyncSequence {
public typealias Element = Element
public typealias AsyncIterator = Iterator
private let makeAsyncIteratorClosure: () -> AsyncIterator
public init<BaseAsyncSequence: AsyncSequence>(_ baseAsyncSequence: BaseAsyncSequence) where BaseAsyncSequence.Element == Element {
self.makeAsyncIteratorClosure = { Iterator(baseIterator: baseAsyncSequence.makeAsyncIterator()) }
}
public func makeAsyncIterator() -> AsyncIterator {
Iterator(baseIterator: self.makeAsyncIteratorClosure())
}
public struct Iterator: AsyncIteratorProtocol {
private let nextClosure: () async throws -> Element?
public init<BaseAsyncIterator: AsyncIteratorProtocol>(baseIterator: BaseAsyncIterator) where BaseAsyncIterator.Element == Element {
var baseIterator = baseIterator
self.nextClosure = { try await baseIterator.next() }
}
public func next() async throws -> Element? {
try await self.nextClosure()
}
}
}
Another implementation would be to use the existing code, but using an Array of PartialIteration<Base.AsyncIterator, Partial>
as a state, and going through all its elements to compute the Tasks to give to Task.select
for each call to next
?
What is your take on that ?
Thanks a lot.