Type erasure of AsyncSequences

I have been using the async algorithms package quite a bit recently and one of the biggest pain points for me is how verbose the type inference tends to get when you start stringing these AsyncSequences together.

Similar to combine's AnyPublisher<Element,Failure> I believe that there should be a way to easily erase the type of the async sequence that is being produced.

Ideally, using the some and any keywords rather than a wrapper type like we have with combine. For example something like this:

// non throwing
let mySequence: any AsyncSequence<Int, Never> = AsyncStream<Int> {
    await Task.sleep(nanoseconds: 1_000_000_000)
    return Int.random(0...100)
}

// throwing
let myThrowingSequence: any AsyncSequence<Int, MyError> = AsyncThrowingStream<Int, MyError> {
    await Task.sleep(nanoseconds: 1_000_000_000)
    let int = Int.random(0...10)
    if int == 5 {
        throw MyError()
    } else {
        return int
    }
}

At my current company (and in my own personal projects) I use something like this. Its not great, but it gets the job done. I just wish there was an official way to do this.

fileprivate extension AsyncStream {
    init<Base: AsyncSequence>(from sequence: Base, file: StaticString = #filePath, line: UInt = #line) where Element == Base.Element {
        var iterator = sequence.makeAsyncIterator()
        // FIXME: In later swift versions, AsyncSequence protocol will likely have an associated error type.
        // FIXME: For now, produce an assertionFailure to let developer know to use an AsyncThrowingStream instead.
        self.init {
            do {
                return try await iterator.next()
            } catch {
                assertionFailure("AsyncSequence threw \(error.localizedDescription). Use AsyncThrowingStream instead", file: file, line: line)
                return nil
            }
        }
    }
}

fileprivate extension AsyncThrowingStream {
    init<Base: AsyncSequence>(from sequence: Base) where Element == Base.Element, Failure == Error {
        var iterator = sequence.makeAsyncIterator()
        self.init {
            try await iterator.next()
        }
    }
}

extension AsyncSequence {
    /// Type erases the `AsyncSequence` into an `AsyncStream`
    /// - Returns: An `AsyncStream` created from the base `AsyncSequence`
    ///
    /// - Note: AsyncSequences do not expose their error type.
    /// So this function is available for both throwing and non-throwing `AsyncSequences`.
    /// It will produce an `assertionFailure` at runtime if the base sequence throws.
    func asAsyncStream(file: StaticString = #filePath, line: UInt = #line) -> AsyncStream<Element> {
        AsyncStream(from: self, file: file, line: line)
    }

    /// Type erases the `AsyncSequence` into an `AsyncThrowingStream`
    /// - Returns: An `AsyncThrowingStream` from the base `AsyncSequence`
    func asAsyncThrowingStream() -> AsyncThrowingStream<Element, Error> {
        AsyncThrowingStream(from: self)
    }
}
4 Likes

I was literally thinking about the same problem yesterday. What would it take to add primary associated types to AsyncSequence?

2 Likes

You can define a refined protocol, and add primary associated types to that.

protocol MyAsyncSequence<Element>: AsyncSequence {}

struct _AsyncSequenceWrapper<Base: AsyncSequence>: MyAsyncSequence {

  typealias Element = Base.Element
  typealias AsyncIterator = Base.AsyncIterator

  var base: Base

  func makeAsyncIterator() -> AsyncIterator {
    base.makeAsyncIterator()
  }
}

extension AsyncSequence {
  func asOpaque() -> some MyAsyncSequence<Element> {
    _AsyncSequenceWrapper(base: self)
  }
}

Clients will see the name "MyAsyncSequence", but besides that, everything will work, including their own wrappers and generic functions using plain "AsyncSequence" constraints. They won't see the "_AsyncSequenceWrapper" at least:

func returnsOpaque(_ url: Foundation.URL) -> some MyAsyncSequence<String> {
  url.lines.asOpaque()
}

func acceptsGeneric(_ seq: some MyAsyncSequence<String>) async throws {
  for try await line in seq {
    print(line.uppercased())
  }
}

func usesExistential() async throws {

  let x: any MyAsyncSequence<String>

  if Bool.random() {
    x = URL(string: "http://example.com")!.lines.asOpaque()
  } else {
    x = AsyncStream(String.self) { continuation in
      continuation.yield("test")
      continuation.finish()
    }.asOpaque()
  }

  // You need to unwrap the existential to use it in a for loop,
  // otherwise the value has type 'Any'. So we pass it in to 'acceptsGeneric', which unwraps.
  //
  // The same thing happens when iterating an 'any Collection<String>',
  // even though that does have primary assoc types.

  try await acceptsGeneric(x)
}

The underlying problem is that we have tied these important language features together in a very unfortunate way, such that clients are severely limited in how they can communicate type relationships outside of primary associated types. That puts pressure on protocol authors to commit to primary associated types even when they may not be ready to do so.

There is no inherent reason it has to be this way. You should be able to communicate erased async sequences without a standard library update, and the standard library authors shouldn't have to worry about what it means for their ability to effectively evolve the protocol.

So I would argue that the solution is not to heap even more pressure on AsyncSequence. We should instead focus on efforts to improve this part of the language so that clients can use these convenient features without specific involvement from library authors.

5 Likes

Checkout pointfree's Concurrency extras, they have type erasure helpers and offer something similar to Combine with the use of .eraseToStream() or eraseToThrowingStream()

link: GitHub - pointfreeco/swift-concurrency-extras: Useful, testable Swift concurrency.

4 Likes

This is really nice, but it strips the throwing context of the type; consumers have to consume all MyAsyncSequence types from within a do...catch (or for try! await in) regardless of whether or not the base sequence throws. Great for internal APIs though or where you need the added performance of generic specialisation.

This is the technique I'd likely reach for where performance is non-critical, It's easy enough to do without adding an external dependency and I imagine performance would be no worse than a type-erased Combine publisher.

1 Like

More generally, I'd love to understand the vision for asynchronous sequences going forward.

When asynchronous sequences first came along I originally assumed they would be able to eventually match the capabilities of Rx derivative frameworks (like Combine) 1-1. That's not to say to become a mirror image, but rather to say that for everything you might be able to do with an Rx Observable you would also be able to do with an asynchronous sequence.

There seems to be two categories of capability on which we're waiting:

  1. Those capabilities for which we need new language features
  2. Those capabilities which are at odds with some internal vision for asynchronous sequences

In the first category, it would be great to see a vision document which outlines what we want to achieve and what the status of the associated language feature is to bring it to fruition. For example, to vend opaque types without a primary associated type, I imagine we need need only some way of expressing a generic constraint on an opaque type, but also the 'throwiness' (for want of a better term) of that type in order to be compatible with rethrows.

In the second category, it would useful to hear what asynchronous sequences aren't. For many of us, used to using Rx derivatives to commute data around our UIs, reaching for asynchronous sequences to perform the same job is seductive. However, the cost of doing so is that 1) you unavoidably introduce context switching; hopping to and from the main actor with careless abandon for each and every element vended, and 2) you break view invariants and miss animation transactions with what seems to be the only delivery guarantee: data you send won't arrive within the current run loop cycle. This isn't tenable for UIs. And that's fine – we just need to know what asynchronous sequence are – and more importantly what they're not – so if necessary we can focus on alternatives.

3 Likes

I think I found a workaround, but kinda wordy

// Basic trick
protocol AsyncThrowingIteratorProtocol<Element>: AsyncIteratorProtocol {}

protocol AsyncNonThrowingIteratorProtocol<Element>: AsyncIteratorProtocol {
  mutating func next() async -> Element?
}

protocol AsyncSequenceOf<Element>: AsyncSequence {}

protocol AsyncNonThrowingSequenceOf<Element>: AsyncSequenceOf where Self.AsyncIterator: AsyncNonThrowingIteratorProtocol {}

typealias AsyncStreamAsyncIterator<T> = AsyncStream<T>.AsyncIterator

extension AsyncStreamAsyncIterator: AsyncNonThrowingIteratorProtocol {}

extension AsyncStream: AsyncNonThrowingSequenceOf {}

extension AsyncThrowingStream: AsyncSequenceOf {}

// Support for `.prefix(_ count:Int)`
typealias AsyncPrefixSequenceIterator<T: AsyncSequence> = AsyncPrefixSequence<T>.Iterator
extension AsyncPrefixSequenceIterator: AsyncThrowingIteratorProtocol {}
extension AsyncPrefixSequenceIterator: AsyncNonThrowingIteratorProtocol where Base: AsyncNonThrowingSequenceOf {
  mutating func next() async -> Base.Element? {
    /// Looks like a bug in the compiler. It has enough info
    /// to deduce this by itself.
    /// But for now we have to do it manually.
    /// Here we must call the implementation of
    /// `AsyncThrowingIteratorProtocol<Base.Element>.next` on `self`.
    /// But there is no syntax to express something like:
    /// `self.(AsyncThrowingIteratorProtocol.next)()`
    /// At least to my knowlendge.
    /// So instead we erase `self` to the desired protocol via opaque,
    /// call the function and apply changes made on the opaque back to `self`.
    var s: some AsyncThrowingIteratorProtocol<Base.Element> = self
    defer { self = unsafeBitCast(s, to: Self.self) }
    return try! await s.next()
  }
}
extension AsyncPrefixSequence: AsyncSequenceOf {}
extension AsyncPrefixSequence: AsyncNonThrowingSequenceOf where Base: AsyncNonThrowingSequenceOf {}

// Example
var s1: some AsyncNonThrowingSequenceOf<Int> {
  AsyncStream<Int> {
    try! await Task.sleep(nanoseconds: 1_000_000_000)
    return Int.random(in: 0...100)
  }.prefix(10)
}

var s2: some AsyncSequenceOf<Int> {
  AsyncThrowingStream<Int, Error> {
    try await Task.sleep(nanoseconds: 1_000_000_000)
    return Int.random(in: 0...100)
  }.prefix(10)
}

var it = s1.prefix(3).makeAsyncIterator()
while let i = await it.next() { print(i) } // OK

//for await i in s1.prefix(10) { } // Error: "Call can throw, but the error is not handled" :( Seems like a bug in the compiler

for try await i in s2.prefix(3) { print(i) } // OK

1 Like

In my view the missing language level feature is being able to express some throws AsyncSequence<T> and some AsyncSequence<T> this would allow for the simplest version to express the non-throwing behavior (which falls in line with how functions are annotated). The other wrinkle that is closely associated is annotations for Sendable too.

Having a solution for these cases would handle almost all normal use cases. It however would NOT handle the cases where types need to be exposed as a composition. E.g. the function map currently returns a AsyncMapSequence, but it can't be a some AsyncSequence<Base.Element> because it could be throwing by other things in the chain above it, likewise it can't be some throws AsyncSequence<Base.Element> because it may be the case that it would never throw.

TBH that missing use case (albeit would be useful) is perhaps not as important in my book as ensuring developers using these don't have to write out lengthy generic signatures.

3 Likes

Interesting. Wouldn't this syntax be a little non-specific to be a generalised language feature? It works for asynchronous sequence as it has only one throwing function, but does that not limit re-throwing protocols to just the one throwing method per type?

Would you mind clarifying on this point a little?

On the one hand it seems like you're saying you wouldn't be able to return a re-throwing asynchronous sequence (such as AsyncMapSequence) as an opaque AsyncSequence, but on the other you say we want to avoid lengthy generic signatures. It would be really unfortunate if we couldn't return an AsyncMapSequence as an opaque AsyncSequence.

We talked a bit about language features to support generalizing AsyncSequences in the more general context of "all the things we need to be generic over but currently can't be" in Algebraic Effects - #18 by KeithBauerANZ

1 Like

How much performance hit are we talking here? I am working on a large app with heavy combine usage which utilizes a lot of eraseToAnyPublisher and I have noticed no hits on performance, at least no visual hits on performance.
I would like to use any AsyncSequence ofc but that is not possible at this time. What do you presume in orders of magnitude is type erasure vs not type erasure compared performance wise? Are there any tests that someone has conducted to investigate?

Depends a lot on what you're doing. If you type-erase a byte stream for example, it'll probably be completely unusable, if you type-erase something that does big chunks of work and fires a couple of times, it'll probably be irrelevant.

3 Likes

In this case why not just use AsyncStream?

I might. I am also exploring options, at this moment in time I am still using Combine but am considering slowly adopting more of Swift Concurrency.

Thats what I do today.

I made an extension on async sequence that erases to an async stream.

The main problem is async sequences do not expose their error type. So theres no way to guarantee that an asyncsequence -> asyncstream actually will never throw.

Would be nice if in swift 6 (or sooner) AsyncStream used primary associated types for its error as well.

Using an AsyncStream is a really blunt tool in this case and comes with significant usability and performance downsides compared to the native any/some features

1 Like

How would one express which function throws with this syntax? To me this seems like it would be pretty limited in its use compared to finding a way to make a primary associated type work here instead.

Maybe we need typed throwing?

so that you can do something like this:

protocol AsyncIterator<Element, Failure> {
    associatedType Element
    associatedType Failure: Error
    func next() async throws Failure -> Element?
}

protocol AsyncSequence<Element, Failure> {
    associatedType Element
    associatedType Failure: Error
    associatedType Iterator: AsyncIterator<Element, Failure>
    func makeAsyncIterator() -> Iterator
}

Could the compiler then allow the omission of the try keyword for functions that throw Never?

The problem comes when you have two error types:

Consider a zip of some AsyncSequence<A, AError> and some AsyncSequence<B, BError> what would the error type of the zip be? Fully typed throws immediately requires (or at least brings up the question of) union types. The answer would be AsyncZipSequence<(A, B), AError | BError> which is a huge can-of-worms (and possibly not solvable in the full generality).

Given that union-type problem; it makes sense that the error in that case becomes an existential any Error. But then if that is the case; didn't you just loose the typing? It begs the question why have any full typed errors in the first place? Instead generic throwing machinery could be restricted to the field of Never and any Error. Meaning that zip's error model would be Never + Never = Never, Never + any Error = any Error and any Error + any Error = any Error.

Yes, the developer chose to lose the typing by using zip with two different error types. I'm not sure how that developer choice is relevant to the overall API design. Frankly I find this argument more convincing for adding an error caster (.error(as: UnionError.self) to an AsyncSequence with an error type than simply not having an error represented at all.

1 Like

Doesn't this problem already exist in many areas in the current version of Swift? Task, AsyncThrowingStream, ThrowingTaskGroup, and [Checked|Unsafe]Continuation all have Failure types defined that can only ever be Never or any Error.

Although it would be nice to be able to have fully typed failures, having only these two cases available is still useful. The same holds for AsyncSequence. Right now we have virtually no way of defining opaque interfaces that return a type like [some|any] AsyncSequence<Element, Failure>, and I think that's a shame. In my own work, I recently had to define a custom AnyAsyncSequence<Element>* that is assumed to always be failable, because there doesn't seem to be a way to define it as not failing. This feels like a rather awkward hole in the standard library at the moment.

I realize that, IIRC, this would be the first protocol that would follow this pattern; maybe plays into the hesitation I sense from the Swift team about this?

*To make a long story short, AsyncThrowingStream didn't work as a "type-erased" wrapper because of the way its continuation "eagerly" executes work.

3 Likes