per your suggestion here @FranzBusch, i did a brief investigation of how the async stream variants might be refactored so they share more internal logic, and wanted to document my findings thus far (messy draft PR here).
i tried two approaches β one was creating new internal types for all the duplicated nested types (yield result, buffer limit, storage, etc), and the other was trying to replace the AsyncStream internal storage by just re-using the analogous types from AsyncThrowingStream<Element, Never>. i'm inclined to think the latter is better as it seems it would require fewer changes overall.
setting aside the slight behavioral differences in how iteration & termination currently work that would need to be resolved, i hit a few other problems.
one is that there is some boilerplate. since all the types involved are nested you need shims to translate from the public API to the internal one and back. this seems straightforward in most cases, but it's a little clunky for, e.g., the termination handler because you need something like a thunk to map between things (maybe there is a better way to handle this?).
the thing that seems most like a potential blocker is the next() method. currently the implementation within AsyncStream._Storage is this:
func next() async throws -> Element? {
try await withTaskCancellationHandler {
try unsafe await withUnsafeThrowingContinuation {
unsafe next($0)
}
} onCancel: { [cancel] in
cancel()
}
}
ideally i think we'd use a single implementation with roughly this signature:
func next() async throws(Failure) -> Element?
i could get this to compile locally by using this implementation:
func next() async throws(Failure) -> Element? {
try await withTaskCancellationHandler { () async throws(Failure) -> Element? in
do {
return try unsafe await withUnsafeThrowingContinuation { continuation in
// TODO: is this the right way to convert to a typed throws continuation?
unsafe next(UnsafeContinuation<Element?, Failure>(continuation.context))
}
} catch {
// TODO: any way around this?
throw error as! Failure
}
} onCancel: { [cancel] in
cancel()
}
}
and the tests i ran locally seemed to pass. however, there are a few issues with this i don't quite understand.
the first is whether this sort of thing is even reasonable:
// continuation is an UnsafeContinuation<Element?, any Error>.
UnsafeContinuation<Element?, Failure>(continuation.context)
i.e. is this the right way to convert from UnsafeContinuation<T, any Error> to UnsafeContinuation<T, SpecificError>, and if not, what is?
the second issue is that when i ran this on CI i hit some build errors implicating embedded swift that i don't really understand. the build logs contained:
11:26:30 /Users/ec2-user/jenkins/workspace-private/swift-PR-source-compat-suite-debug-macos/swift/stdlib/public/Concurrency/PartialAsyncTask.swift:762:64: error: cannot use a value of protocol type 'any Error' in embedded Swift [#]8;;[https://docs.swift.org/compiler/documentation/diagnostics/embedded-restrictions\EmbeddedRestrictions]8;;\](https://docs.swift.org/compiler/documentation/diagnostics/embedded-restrictions/EmbeddedRestrictions]8;;/)]
11:26:30 760 | public func resume(throwing error: consuming E) {
11:26:30 761 | #if compiler(>=5.5) && $BuiltinContinuation
11:26:30 762 | unsafe Builtin.resumeThrowingContinuationThrowing(context, error)
11:26:30 | `- error: cannot use a value of protocol type 'any Error' in embedded Swift [#]8;;[https://docs.swift.org/compiler/documentation/diagnostics/embedded-restrictions\EmbeddedRestrictions]8;;\](https://docs.swift.org/compiler/documentation/diagnostics/embedded-restrictions/EmbeddedRestrictions]8;;/)]
11:26:30 763 | #else
11:26:30 764 | fatalError("Swift compiler is incompatible with this SDK version")
11:26:30
11:26:30 /Users/ec2-user/jenkins/workspace-private/swift-PR-source-compat-suite-debug-macos/swift/stdlib/public/Concurrency/AsyncStream.swift:455:20: note: generic specialization called here
11:26:30 453 | @discardableResult
11:26:30 454 | public func yield() -> YieldResult where Element == Void {
11:26:30 455 | return storage.yield(()).nonThrowingRepresentation
11:26:30 | `- note: generic specialization called here
11:26:30 456 | }
11:26:30 457 | }
11:26:30
11:26:30 [#EmbeddedRestrictions]: <https://docs.swift.org/compiler/documentation/diagnostics/embedded-restrictions>
it looks to me like it's complaining about a callsite that should be specialized to have a Never-throwing function but it seems to think it throws an existential error for some reason. this further makes me wonder if some changes might be needed in withUnsafeThrowingContinuation to do this sort of thing.
my current outstanding questions are:
does this embedded swift CI error make sense to anyone? does embedded swift need some sort of special handling? does withUnsafeThrowingContinuation need to adopt typed throws before this sort of refactor can be done?