On unifying the Async{Throwing}Stream internal implementation

per your suggestion here @FranzBusch, i did a brief investigation of how the async stream variants might be refactored so they share more internal logic, and wanted to document my findings thus far (messy draft PR here).

i tried two approaches – one was creating new internal types for all the duplicated nested types (yield result, buffer limit, storage, etc), and the other was trying to replace the AsyncStream internal storage by just re-using the analogous types from AsyncThrowingStream<Element, Never>. i'm inclined to think the latter is better as it seems it would require fewer changes overall.

setting aside the slight behavioral differences in how iteration & termination currently work that would need to be resolved, i hit a few other problems.

one is that there is some boilerplate. since all the types involved are nested you need shims to translate from the public API to the internal one and back. this seems straightforward in most cases, but it's a little clunky for, e.g., the termination handler because you need something like a thunk to map between things (maybe there is a better way to handle this?).

the thing that seems most like a potential blocker is the next() method. currently the implementation within AsyncStream._Storage is this:

func next() async throws -> Element? {
  try await withTaskCancellationHandler {
    try unsafe await withUnsafeThrowingContinuation {
      unsafe next($0)
    }
  } onCancel: { [cancel] in
    cancel()
  }
}

ideally i think we'd use a single implementation with roughly this signature:

func next() async throws(Failure) -> Element?

i could get this to compile locally by using this implementation:

func next() async throws(Failure) -> Element? {
  try await withTaskCancellationHandler { () async throws(Failure) -> Element? in
    do {
      return try unsafe await withUnsafeThrowingContinuation { continuation in
        // TODO: is this the right way to convert to a typed throws continuation?
        unsafe next(UnsafeContinuation<Element?, Failure>(continuation.context))
      }
    } catch {
      // TODO: any way around this?
      throw error as! Failure
    }
  } onCancel: { [cancel] in
    cancel()
  }
}

and the tests i ran locally seemed to pass. however, there are a few issues with this i don't quite understand.

the first is whether this sort of thing is even reasonable:

// continuation is an UnsafeContinuation<Element?, any Error>.
UnsafeContinuation<Element?, Failure>(continuation.context)

i.e. is this the right way to convert from UnsafeContinuation<T, any Error> to UnsafeContinuation<T, SpecificError>, and if not, what is?

the second issue is that when i ran this on CI i hit some build errors implicating embedded swift that i don't really understand. the build logs contained:

11:26:30  /Users/ec2-user/jenkins/workspace-private/swift-PR-source-compat-suite-debug-macos/swift/stdlib/public/Concurrency/PartialAsyncTask.swift:762:64: error: cannot use a value of protocol type 'any Error' in embedded Swift [#]8;;[https://docs.swift.org/compiler/documentation/diagnostics/embedded-restrictions\EmbeddedRestrictions]8;;\](https://docs.swift.org/compiler/documentation/diagnostics/embedded-restrictions/EmbeddedRestrictions]8;;/)]
11:26:30  760 |   public func resume(throwing error: consuming E) {
11:26:30  761 |     #if compiler(>=5.5) && $BuiltinContinuation
11:26:30  762 |     unsafe Builtin.resumeThrowingContinuationThrowing(context, error)
11:26:30      |                                                                `- error: cannot use a value of protocol type 'any Error' in embedded Swift [#]8;;[https://docs.swift.org/compiler/documentation/diagnostics/embedded-restrictions\EmbeddedRestrictions]8;;\](https://docs.swift.org/compiler/documentation/diagnostics/embedded-restrictions/EmbeddedRestrictions]8;;/)]
11:26:30  763 |     #else
11:26:30  764 |     fatalError("Swift compiler is incompatible with this SDK version")
11:26:30  
11:26:30  /Users/ec2-user/jenkins/workspace-private/swift-PR-source-compat-suite-debug-macos/swift/stdlib/public/Concurrency/AsyncStream.swift:455:20: note: generic specialization called here
11:26:30  453 |   @discardableResult
11:26:30  454 |   public func yield() -> YieldResult where Element == Void {
11:26:30  455 |     return storage.yield(()).nonThrowingRepresentation
11:26:30      |                    `- note: generic specialization called here
11:26:30  456 |   }
11:26:30  457 | }
11:26:30  
11:26:30  [#EmbeddedRestrictions]: <https://docs.swift.org/compiler/documentation/diagnostics/embedded-restrictions>

it looks to me like it's complaining about a callsite that should be specialized to have a Never-throwing function but it seems to think it throws an existential error for some reason. this further makes me wonder if some changes might be needed in withUnsafeThrowingContinuation to do this sort of thing.

my current outstanding questions are:

does this embedded swift CI error make sense to anyone? does embedded swift need some sort of special handling? does withUnsafeThrowingContinuation need to adopt typed throws before this sort of refactor can be done?

2 Likes

I think we could do something like this:

struct IntError: Error {}


func getIntOrIntError() throws(IntError) -> Int {
	let value = Int.random(in: 0...100)

	if value >= 50 {
		return value
	} else {
		throw IntError()
	}
}


func next() async throws(IntError) -> Int {
	let result = await withUnsafeContinuation { (continuation: UnsafeContinuation<Result<Int, IntError>, Never>) in
		do throws(IntError) {
			continuation.resume(returning: .success(try getIntOrIntError()))
		} catch {
			continuation.resume(returning: .failure(error))
		}
	}

	return try result.get()
}

However, in my opinion, it would make more sense to first update the fundamental concurrency building blocks to support typed throws.

This is the current pattern that seems to work the best imho. Perhaps we could use this as pressure to get a typed throws version of withUnsafeContinuation?

4 Likes

+1 I think it would be fantastic if we could get both withTaskCancellationHandler and the two throwing continuation APIs updated to support typed throws. As essential building blocks for asynchronous sequences, they would reduce implementation friction.

1 Like

tangential, but perhaps of interest to you: withTaskCancellationHandler was recently updated on main to support typed throws.


updating withUnsafeContinuation would be nice, but is it strictly necessary to do what we're discussing here? i still haven't managed to figured out why the compiler is falling down when building for embedded swift...

Thanks for the info! Wasn't aware of it, but great to see progress there.

Even when using the result pattern? It should work just fine. It’s quite neat for these use cases and less verbose than I made it seem in my first comment.

I’m not well acquainted with the state of concurrency in Swift Embedded, but do these APIs even work there currently?

indeed i think it may work – at least it got me past some local build issues. i had not tried fully converting to a result based implementation that never called withUnsafeThrowingContinuation, but that seems to have done the trick so far. thanks for the suggestion!

i'm also not familiar and unsure. it seems like something weird happens when using the throwing continuation API in embedded swift, but it's plausible that it just can't be used in that context currently.

1 Like