AnyAsyncSequence

We decided to use a custom type conforming to AsyncSequence that just internally holds onto a concrete type that we can switch out later. This allows to not leak implementation details and gives us flexibility in the future. But we are looking forward to this:

Long term, the intention is to expand opaque result types so that they can be used with where constraints. This would allow implementors to return a concrete sequence as an opaque sequence where Element == T , so prevent callers from relying on the specific type they return, allowing it to be changed later without a source break (or, in the case of a library built for distribution, an ABI break).

2 Likes

Another use-case for the thread, assuming full generalised existentials are still a while away. Opaque result types don't seem to cover this.

class Player {
    let media: Media // Concrete type is determined dynamically in the initialiser - can't be generic.
}
protocol MediaSource {
    // I specifically want to be able to observe changes over time in this value
    var isPlaying: AnyAsyncSequence<Bool> {get}

    …
}
struct AudioMediaSource: MediaSource { /* AVAudioPlayer */ }

struct VideoMediaSource: MediaSource { /* AVPlayer */ }

struct CustomMediaSource: MediaSource { /* A corresponding third-party playback solution */ }

Not applicable:

  • Convert isPlaying to use an associated type - Media can no longer be stored in a property.
  • Opaque type constraints (where clause) - opaque types are not allowed as a protocol requirement, and the Media property specifically isn't limited to a single type.
  • Change the requirement to @Published var isPlaying: Bool - property wrappers can't be used in a protocol.

Workarounds which make the case for type erasure:

  • Wrap the underlying async sequences in an AsyncStream - this is effectively type erasure, though it's a little tricky to handle extras like cancellation properly.
  • Use AnyPublisher from Combine - an existing type-erased wrapper.
  • Manually implement AnyMedia.

Workarounds:

  • Replace the protocol with a single struct, and the concrete types with enum cases. Do a switch within each method on the struct to provide appropriate behaviour. This isn't compatible for a framework use-case where third-parties may wish to add conformances, but could work for me.

Although some AsyncSequence is pretty interesting, it doesn't seem to help when I need to expose abstract logic through concrete APIs.

For example, if I have some use case logic that retrieves a list of orders, I'd want to reuse some underlying custom decoder and error handling that I can pass to all my fetch methods (see how fetchModels<T>() is used within fetch orders and products methods):

struct MyOrder: Decodable {...}

func fetchOrders() -> AsyncStream<[MyOrder]> {
    fetchModels() // Decodes using inference
}

func fetchProducts() -> AsyncStream<[MyProduct]> {
    fetchModels()
}

...

var values: AsyncStream<URLSessionWebSocketTask.Message>> {...}

func fetchModels<T>() -> AsyncStream<[T]> where T: Decodable {
    values.compactMap { (result: URLSessionWebSocketTask.Message) in
        switch result {
        case let .success(message):
            switch message {
            case let .string(string):
                guard let data = string.data(using: .utf8) else { return nil }
                return try? JSONDecoder().decode(T.self, from: data)
            case let .data(data):
                return try JSONDecoder.apiDecoder.decode(T.self, from: data)
            @unknown default:
                return nil
            }
        case let .failure(error):
            return nil
        }
    }
}

I'm having a hard time seeing how some AsyncSequence would help here. I'm using inference for fetchOrders() -> AsyncStream<[MyOrder]> to call fetchModels<T>() and passively let it know type to decode within it. Casting didn't work, plus it broke the generic inference (so it can internally know what to decode as):

func fetchOrders() -> AsyncStream<[MyOrder]> {
    fetchModels() as AsyncStream<[MyOrder]>
}

func fetchProducts() -> AsyncStream<[MyProduct]> {
    fetchModels() as AsyncStream<[MyProduct]>
}

func fetchModels<T>() -> some AsyncStream {...}

The AnyAsyncSequence made by @John-Connolly worked amazingly tho (thank you!!! :star_struck:):

func fetchOrders() -> AnyAsyncSequence<[MyOrder]> {
    fetchModels().eraseToAnyAsyncSequence()
}

func fetchProducts() -> AnyAsyncSequence<[MyProduct]> {
    fetchModels().eraseToAnyAsyncSequence()
}

func fetchModels<T>() -> AnyAsyncSequence<T> {...}

The some AsyncSequence route seems much more elegant but I can't make it solve my scenario like the type erasing can. Maybe someone can make the opaque type version work in this case as good as the type erasing, otherwise not ideal but hopefully we can get the native AnyAsyncSequence counterpart like AnyPublisher.

2 Likes

Unfortunately, solution from John-Connolly is far from perfect... because from AsyncSequence (for extension and generics) we don't know if AsyncIteratorProtocol.next throws, rethrows or doesn't throw an error... so AnyAsyncSequence from AsyncStream needs try for iteration and AsyncStream doesn't... It all makes me unhappy... Why just not to create two protocols AsyncSequence and AsyncThrowingSequence and just return AsyncStream or AsyncThrowingStream for AsyncSequence.map/compactMap as Sequence.map/compactMap returns Array... it's very hard to use AsyncSequence for interfaces and dependency injection... will continue using Combine with AnyPublisher. I don't like AsyncSequence very much, it's very inconvenient... thanks for great API...

4 Likes

This is ridiculous. Makes AsyncSequence confined to demos at best unfortunatelly.

2 Likes

why not atleast a primary associated type so I can do some AsyncSequence<String>?

1 Like

If you read swift-evolution/0358-primary-associated-types-in-stdlib.md at main · apple/swift-evolution · GitHub you will find out why it is not yet available.

3 Likes

Has there been any updates on this? This is something sorely needed. Especially if you use something the AsyncAlgorithms package.

For example, something simple like this already becomes super unweildly:

private var testSequence: AsyncMapSequence<AsyncCombineLatest2Sequence<AsyncStream<String>, AsyncStream<Bool>>, String> {
    let stream1 = AsyncStream {
        return "Test"
    }

    let stream2 = AsyncStream {
        return true
    }

    return AsyncCombineLatest2Sequence(stream1, stream2)
        .map { (string, bool) -> String in
            if bool {
                return "\(string) is true"
            } else {
                return "\(string) is false"
            }
        }
}

you could do something like this, but then you lose access to type information without casting:

private var testSequence: some AsyncSequence {
    let stream1 = AsyncStream {
        return "Test"
    }

    let stream2 = AsyncStream {
        return true
    }

    return AsyncCombineLatest2Sequence(stream1, stream2)
        .map { (string, bool) -> String in
            if bool {
                return "\(string) is true"
            } else {
                return "\(string) is false"
            }
        }
}

private func test() async throws {
    for try await value in testSequence {
        // value is (some AsyncSequence).Element
    }
}
4 Likes

I agree, for now it is very annoying to use AsyncSeqence crossing api boundaries. As a workaround, I always use AsyncStream (feeding it the values from another AsyncSequence) as a return type. I really hope we will get primary associated types for AsyncSequence soon.

5 Likes

Do we know whyAsyncSequence doesn't have a primary associated type? Is it hard to implement for that particular type or are there other concerns?

The problem is about error handling. @hborla summarized it well in the SE-0346 review thread:

See also the Pitch thread of SE-0358:

1 Like

Primary associated types have been helpful, but they're still a band-aid, and they don't even yet cover all synchronous cases that should be expressible with multiple somes. E.g.

import typealias Algorithms.StridingSequence

public extension Sequence {
  /// Distribute the elements as uniformly as possible, as if dealing one-by-one into shares.
  /// - Note: Later shares will be one smaller if the element count is not a multiple of `shareCount`.
  @inlinable func distributedUniformly(shareCount: Int)
  -> LazyMapSequence<Range<Int>, StridingSequence<DropFirstSequence<Self>>> {
    (0..<shareCount).lazy.map {
      dropFirst($0).striding(by: shareCount)
    }
  }
}

Array((1...10).distributedUniformly(shareCount: 3)).map(Array.init))
[[1, 4, 7, 10], [2, 5, 8], [3, 6, 9]]

We need full constraints for opaque return types.

It's not quite as elegant as the proposed var values: some AsyncSequence<String>, but you can do:

var values: AsyncMapSequence<some AsyncSequence, String> {
    // ...
}
2 Likes

If AsyncSequence had an primary associated type Element, you could write the code like this:

func getStrings() -> some AsyncSequence<String?> {
  AsyncStream<Data> { continuation in 
    // fetch data
  }.map { data in
    String(data: data, encoding: .utf8)
  }
}

As it stands, it doesn't, but you could easily work around this.

public protocol SomeAsyncSequence<Element>: AsyncSequence { }
extension AsyncMapSequence: SomeAsyncSequence { }

func getStrings() -> some SomeAsyncSequence<String?> {
  AsyncStream<Data> { continuation in 
    // fetch data
  }.map { data in
    String(data: data, encoding: .utf8)
  }
}

There are probably other issues, but importantly, @rethrows won't propagate.

import AsyncAlgorithms
let sequence = [()].async
var collected = await Array(sequence)
let someSequence: some SomeAsyncSequence<Void> = sequence
collected = await .init(someSequence) // Call can throw but is not marked with 'try'

Then you could just change SomeAsyncSequence to be a rethrows protocol.

@rethrows public protocol SomeAsyncSequence<Element>: AsyncSequence { }

No, that’s precisely what I was saying doesn’t work.

I wonder if that has to do with the protocol inheritance or the some type. I think a rethrows protocol is supposed to determine whether it throws based on how the requirements are fulfilled.

I too find this strong typing hard to deal with, especially for us developing APIs using AsyncSequence. Say we have a protocol

public protocol CustomFeed {
    /// The type of the sequence of events emitted by this feed.
    associatedtype Events: AsyncSequence & Sendable where Events.Element == CustomFeedEvent
    
    /// The events emitted by this feed.
    var events: Events { get }
}

When a user implement their type that conforms to the CustomFeed protocol, they have to provide a var events: AsyncStream<CustomFeedEvent> - Not AsyncMapSequence<AsyncStream<Int>, CustomFeedEvent> (such as they may want to create some mock data to test it), not AsyncThrowingCompactMapSequence<AsyncLineSequence<URL.AsyncBytes>, CustomFeedEvent> (such as they may need to parse some file line-by-line to generate the feed).

It makes us designing the API hard, as we cannot provide all the combinatorial amount of APIs to handle the input types, so it is the user's responsibility to fit their input into our typing requirement.

That is also hard, because there isn't a built-in way to map an AsyncStream<T1> into an AsyncStream<T2> with a transforming function T1T2 , as it works for mapping Array<T1> into Array<T2>.

It can be done without too much code.

extension AsyncStream {
  func map2(transform: (Element) async -> Element) -> AsyncStream<Element> {
    AsyncStream { continuation in 
      for await element in self {
        await continuation.yield(transform(element))
      }
      continuation.finish()
    }
  }
}

That is not as efficient as AsyncMapSequence though.