SE-0314 (Second review): AsyncStream and AsyncThrowingStream

@Philippe_Hausler I'm not following the reason for two iterator types. The nested iterator types does capture the Failure type, which when it's Never, it would eliminate the error if we used Result<Element, Failure> as the iterators Element type instead. Is it not possible to extend the for in syntax to automatically transform Result into the throwing syntax that uses try by adding .get() to the sugar code?

let array: [Result<Int, Error>] = ...
for try value in array { ... } // NEW
for case .success(let value) in array { ... }

// the former would be sugar for
var iterator = array.makeIterator()
while let value = try iterator.next()?.get() {
  ...
}

I am not sure that will work for the rethrows cases though. Because your suggestion is inferring that we re-do all of the implementations of the other AsyncSequences added and re-do the compiler implementation.

Hmm. Now that you've mentioned the other sequences. Technically since we fixed the missing generic Failure type in this proposal, I kinda would assume that the other sequences (e.g AsyncThrowingFilterSequence) should also all have the Failure generic type parameter. It feels like an oversight in the flood of the whole giant async feature this year.

To be clear, I'm not trying to be rude or force more work on you and the team working on all these things, I'm just sharing my personal view point on how I as a consumer of these APIs could imagine those to function. I personally prefer as much generalization over generic types as possible.

3 Likes

I did a quick and dirty experiment, while I'm not sure about the effects of the @rethrows attribute, I had to technically adjust AsyncIteratorProtocol, that's why I used a custom _AsyncSequence which used my custom _AsyncIteratorProtocol. And I could fairly easily rewrite AsyncThrowingMapSequence.

You might want to look at it and think if it would be something that Swift might want to do now (if there's still a bit of time left):

// Only used because we needed a change made in `_AsyncIteratorProtocol`.
public protocol _AsyncSequence {
  associatedtype AsyncIterator: _AsyncIteratorProtocol
  associatedtype Element where Self.Element == Self.AsyncIterator.Element

  func makeAsyncIterator() -> Self.AsyncIterator
}

public protocol _AsyncIteratorProtocol {
  associatedtype Element
  associatedtype Failure: Error // NEW

  // NEW return type
  mutating func next() async -> Result<Self.Element, Failure>?
}


extension _AsyncSequence {
  @inlinable
  public __consuming func map<Transformed>(
    _ transform: @escaping (Element) async throws -> Transformed
  ) -> AsyncThrowingMapSequence<Self, Transformed, Error> {

    AsyncThrowingMapSequence(self) { element in
      // wrapping the value and failure into the `Result`
      do {
        return await .success(try transform(element))
      } catch {
        return .failure(error)
      }
    }
  }
}

public struct AsyncThrowingMapSequence<
  Base: _AsyncSequence,
  Transformed,
  Failure
> where Base.AsyncIterator.Failure == Failure {
  // Make sure we align the failure types. This is also true in `Combine`
  // and we would need a separate sequence to change / map the `Failure` type.
  @usableFromInline
  let base: Base

  @usableFromInline
  // Use a result type
  let transform: (Base.Element) async -> Result<Transformed, Failure>

  @usableFromInline
  init(
    _ base: Base,
    transform: @escaping (Base.Element) async -> Result<Transformed, Failure>
  ) {
    self.base = base
    self.transform = transform
  }
}

extension AsyncThrowingMapSequence: _AsyncSequence {

  public typealias Element = Transformed
  /// The type of iterator that produces elements of the sequence.
  public typealias AsyncIterator = Iterator

  /// The iterator that produces elements of the map sequence.
  public struct Iterator: _AsyncIteratorProtocol {
    @usableFromInline
    var baseIterator: Base.AsyncIterator

    @usableFromInline
    let transform: (Base.Element) async -> Result<Transformed, Failure>

    @usableFromInline
    var finished = false

    @usableFromInline
    init(
      _ baseIterator: Base.AsyncIterator,
      transform: @escaping (Base.Element) async -> Result<Transformed, Failure>
    ) {
      self.baseIterator = baseIterator
      self.transform = transform
    }

    @inlinable
    public mutating func next() async -> Result<Transformed, Failure>? {
      guard
        !finished,
        let result = await baseIterator.next()
      else {
        return nil
      }
      switch result {
      case .success(let element):
        return await transform(element)
      case .failure(let error):
        finished = true
        return .failure(error)
      }
    }
  }

  @inlinable
  public __consuming func makeAsyncIterator() -> Iterator {
    return Iterator(base.makeAsyncIterator(), transform: transform)
  }
}

If and when we get typed throws. All we would need to do, will be a simple overload that will already use an existing and compatible generic type.

extension _AsyncSequence {
  @_alwaysEmitIntoClient // back deployment ?? 🙂
  @inlinable
  public __consuming func map<Transformed, Failure: Error>(
    _ transform: @escaping (Element) async throws Failure -> Transformed
  ) -> AsyncThrowingMapSequence<Self, Transformed, Failure> { ... }
}

That seems like a reasonable answer here. Use

continuation.onTermination = { [myInstance] in
}

to make it very explicit that our goal is to capture myInstance.

Doug

1 Like

Hello Swift Community,

The Core Team discussed this proposal and are ready to accept it. However, we're going to leave the thread open a couple more days to get more feedback on the AsyncStream type name: this facility has gone through several names (YieldingContinuation, AsyncSeries, AsyncStream) without landing on something that feels "right". Now that the rest of the design has settled, have we missed a great name along the way?

Doug

5 Likes

Just thinking out loud...

I think what makes this special is two-fold:

  1. Compared to the standard use of AsyncSequence, this allows the core logic to be push instead of pull. It will wrap any producer into a simple AsyncSequence.
  2. Compared to the standard async function, this allows the return of a series of values instead of a single value only. But I guess AsyncSequence does that too.

I like AsyncStream because it brings associations to Reactive Streams :slight_smile: And I guess this thing is meant to be a reasonable alternative to reactive extensions, Combine, etc.

On the other hand, what this really does is insert a (possibly empty) buffer between a producer and a consumer. So perhaps it's really just a BufferingIterator?

On the implementation side, I note that AsyncStream itself only holds a produce async closure, which is assigned to Storage.next() (retaining the storage, which is a final class).

I mention this because it seems the new unfolding initialiser seems to make the initialisation procedure much more complex. Have any measurements been done as to how this affects performance? If there is any significant regression, that could affect whether or not it's worth including the unfolding initialiser.

Additionally, given that this type will presumably be @frozen, I wonder if it wouldn't be better for AsyncStream (and its iterator) to directly hold a reference to the storage, and to call next() directly, for potentially better inlining? Who knows what sort of impact it will have (it may even be worse), but in general I've found that closures are barriers to optimisation -- the compiler needs to do some serious heroics to have an idea of what any given closure might contain.

Perhaps: [Async]SequenceSource or [Async]SequenceGenerator more inline with actual function?

2 Likes

As to naming, I guess this new type is the answer to the question "How do I create an AsyncSequence?".

We typically use type erased structs for this. Like AnySequence or AnyIterator.

Perhaps we should simply name this AnyAsyncSequence or AnyAsyncIterator (and perhaps add a new initializer to type-erase)?

The "pull based" initializer init(unfolding:onCancel:) is an async variant of UnfoldingSequence, I guess. Will we have something like sequence(state:next:) for this? :slight_smile:

@Douglas_Gregor I have an important question. We made sure that the proposed throwing type has a Failure: Error generic type parameter. That's great, but what about the transformation sequence types that also throws, which are used for operations like map, filter, etc. Their throwing counter parts do not provide such a generic parameter. This will be very painful in the future when we would need those types to have that Failure generic type, which would require us to introduce new types, which will also lock us to a new OS version where those would have been introduced.

TLDR; should we fix those as well, while we still have a bit of time left?

Also, as discussed above, the implementations could be greatly simplified if the compiler would accept Result and automatically apply sugar transformation over for in when you use try. This will also require us to introduce only one type for each sequence operation and the AsyncStream<Element, Failure: Error>.


As for the naming scheme for types proposed here: I tend to AsyncStream, which fits great when you're used to reactive frameworks such as Combine and other.

1 Like

While your approach looks more cleaner, it's not feasible in an environment which treats warnings as errors as the compiler will emit a warning for the captured but unused instance. _ = myInstance is the only solution to the problem it seems, which is a bit awkward.

cc @Philippe_Hausler

Would it make sense to introduce an overload that accepts a closure which returns Any, which is then persisted by the AsyncStream?

AsyncStream { continuation in 
   ...
   return myInstance
}

I'm not sure if it will bite with the non-returning closure though.

I was a bit overloaded recently and missed the review deadline is up here...

Glad this is moving forward!
The latest adjustments based on feedback do address my greatest concerns voiced previously and make the type useful and actually usable, glad to see it.

I'll focus on the topics @Douglas_Gregor requested to focus on then:


Naming

First the good things! All those are very nice:

  • the bufferingOldest, bufferingNewest, and unbounded names are very good!
    • It's the inverse of how I'm used to think about it but it's a great phrasing and actually easier to understand the behavior probably, +1 on those.
  • the enqueued(remaining) is good, though making any use of the "remaining" value is likely to be racy, to be honest I'd drop that from the API and just do enqueued but I can see for a single producer workflow this may be fun to utilize and build an efficient prefetch perhaps.
  • Question: init(unfolding:onCancel:) is nice, though shouldn't the produce be allowed to throw as well?

The actual type: AsyncSequenceSource

I feel very strongly that calling this a "(Async)Stream" is a very bad idea, because it isn't "the stream".

It is a way to "generate" the "beginning" (source, publisher, producer, origin...) of a stream, and I'm glad the naming is being asked for feedback rather than steamrolling ahead with it :pray:

In the past I spent a long time naming such exact type in the past and I really do think it captured the best and most important semantics in the name so I'll bring it up here again.

The type's primary goal in life is:

  • bridge "non stream" / "non async" / "non back-pressure aware" APIs with the AsyncSequence
    • as such the name should imply it is closely related to the async sequence

The type inherently behaves like a "queue":

  • it is either bounded or unbounded, just like a queue
  • we offer elements to it from "the outside", from any thread, just like a queue
  • if it is bounded, it may drop elements, just like a bounded queue

As such, I really think adopting similar naming would be beneficial. At the same time, it gets tricky with too many concepts in the name and "AsyncSequenceSourceQueue" while very precise, I guess would not be met with much love.

I think we could drop the Queue word and focus on the other important part of this API:

  • this type is NOT just some arbitrary part of a stream, it is always the "beginning", the source, of the stream.

As such, I think we should combine these two pieces of information into: AsyncSequence + Source = AsyncSequenceSource.

It makes it incredibly clear that:

  • this is used to "begin" async sequences,
  • there may be various kinds of sources -- we already have two: unfold and the continuation-based one, there can be other types of async sequence sources, such as timers etc which may be useful to add here in the future.

The API would end up reading like this:

  • AsyncSequenceSource(unfold: ...) OR AsyncSequenceSource.unfold { ... }
  • AsyncSequenceSource(of: Int.self, .bufferingNewest(10)) { c in ... }
    • notice the of: ... which is to align the API shape with TaskGroup(of: Int.self) which we have today

And it is easy to talk about -- "use a sequence source" etc. Pushback against this name has been because "other sources exist" but I don't think this is a good reason, because we're designing core types for swift concurrency here, and these are what will be showing up more and more, and not less and less, so they deserve to have good names.

As someone who worked on the RS specification I'll chime in to this a little bit. While the specification is called Reactive Streams, none of the implementations make use of the word "Stream" in their APIs, this is for good reason: "the stream" is the composition of many stages/elements/operators, and as such no single operator/stage/element is "a stream" but rather a Source, Flow, or Sink (alternatively a Publisher, Processor and Subscriber, or even Producer, Processor, and Consumer (trivia, these used to be the original names :slight_smile:)). And if a bunch of them get together and actually run -- that's the stream.

In the same way as tracing systems never have a "trace" object but have "spans" and a "bunch of spans" interconnected using specific rules is the "distributed trace".


While still on the naming theme, and not challanging any of the functionality but only the naming inconsistencies. These are small but annoying inconsistencies that would be nice to iron out...

"yield" -> offer or push

I have some issues with "Yield" in general though, and as such with YieldResult as a type name... these are not "yield" operations to me. I've never seen an API where a "yield" can fail, he this is exactly what this is.

This operation is exactly what is typical with bounded queue implementations and usually named "push" or "offer". I especially like offer because it expresses prefectly what this operation is, and that such offer may be rejected (if no buffer space is available). This is an important feature of the API and the name should reflect it.

The type name YieldResult would be adjustec accordingly, OfferResult or Offered.dropped (Offered.enqueued) or similar, allowing for:

switch source.offer(...) { 
  case .terminated:
  case .dropped(...):
  case .enqueued(...):

Looking at this one realizes that we're mixing terminology here again... and neither the "yield" or "offer" make sense to reply with "enqueued" (!).

The result type therefore to be consistent in terminology used could be:

enum ElementOffer { 
  case accepted(remaining: Int)
  case dropped(Element)
  case terminated
}

which would be a bit cleaner if we're to stick with offer() it is normal to "accept an offer" after all.

If we went with "push()" the enqueued makes sense again because we're back in "queue terminology" IMHO.

Minor: finish() -> complete()

I'm very weirded out by the "finish" to be honest, it's not a hill worth dying on I guess but it's very weird to me, rather this could be more similar to completion where completion is one of: successul, failed, cancelled.

as @DevAndArtist has called out above in the thread.


Nitpicks while here, not a hill I'll die on but still worth pointing out:

  • BufferingPolicy.unbounded is missing documentation.
    • I would suggest adding documentation suggesting to not use this value as it can easily lead to explosive, unlimited, memory usage growth.
    • In apps that'll just kill the app, but it is traumatic for server systems where one such bad/silly stream growing forever would OOM and crash the entire server.
  • bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded, I said this in prior reviews and dicussions, unbounded default is IMHO a bad idea – by default we disabled any kind of backpressure in this type basically by doing this – but if people really like it so be it.
17 Likes

For what it's worth, I would prefer just AsyncSource.

It's short, very simple, easy to remember. And it sounds like something you should use all the time :grin:

3 Likes

Another name that Doug missed: Series which was not liked by some of the initial folks reviewing it before the proposal was posted. I personally still like that name and I feel it is a nice short name that reflects what it represents. Also during the pre-pitch-review the name Stream (with no prefix) was thrown about - which I distinctly think is a mistake since it camps on a name that is already part of widely used APIs (even though perhaps its utility is not as great anymore...)

However to the naming of "Source"; I object to that name since this type is distinctly NOT the source of elements, instead it is the stream or series of elements produced BY a source.

If we take a look at prior art that is similar; We have the Subject family from Reactive systems like AsyncSubject - which might be confusing since the shape of that type is externalized for it's capability to yield values. There is also have Flow from Kotlin, which is a decent name and definitely harkens to the water metaphor which is so commonly used in types like this. There are a number of other similar types to this utility but overall from all of the implementations this type most resembles Kotlin's Flow type A number of design characteristics take inspiration to that shape so it is fair to say that the name is something familiar in that metaphorical scope (without clobbering existing types that are in commonly used namespaces of SDKs folks use) so AsyncStream (even though is perhaps not my favorite) is a good name that represents the type well and offers familiarity to folks that might be learning the language coming from other areas of expertise.

1 Like

I debated on this when implementing it - there is good reason to have a "raw crunchy" style API for folks to use, however this type really doesn't seem to be that. If you really want the "raw crunchy" type thing then perhaps making an AsyncSequence directly (instead of using an intermediary) is worth the minimal extra effort. My logic was this: if someone really wants to eek out the last drop of perf then they would be willing to write the little extra to make their own AsyncSequence, whereas this type offers a bit of support to do so - particularly around cancellation. So I figured that any construction as folks were asking for (the back pressure initializer) the extra thing they wanted was the convinces around creating an async sequence that adheres to the concepts like cancellation. withCancellationHandler can be a tricky API to use safely since often times it requires external synchronization around state. That initializer provides that safety.

This may be one of those “various people got attached to some meaning of a word and can’t agree now”… :wink: But your argument is weird here: the “what” powers the source does not matter at all, it’s some callback, some iterator, whatever.

The actual source of the stream—meaning “the place where the stream/flow begins” is this type, because it is the first element that is part of the stream. It is the first element in the stream that must adhere to e.g. flow control and really begins the entire chain of (potential) operations. Anything outside of it is clearly not part of the stream, and as such calling the “outside of the stream” pieces a source is very weird, IMHO.

6 Likes

Another way to express that, is that it is indeed the source of the Sequence.

Or that a GCD dispatch source timer isn’t the true source of time, but the generator of the timer events.

I see it as a factory, so I like source and generator better, but:

AsyncSequenceFactory

For the naming, Stream itself is NOT the source, it is much like - function of data source, which similar to View - is function of states; in other words, View is a functional representation of state and Stream is a functional representation of data source. Furthermore, streams can be pipelined/composited together like functions do.

AsyncStream, AsyncFlow, or AsyncPipe should be reasonable candidates for this programming paradigm; since it's compatible with well-known functional reactive stream programming conception.

For ref links:


What is a stream?
A stream is a kind of object used to denote sources and sinks of data in most high-level programming languages, including C++. 
The stream abstraction is most commonly used to provide an interface to files, including standard input and standard output.

Unlike a vector, a stream does not guarantee that you can read or write any element at any time. 
Unlike a list, a stream doesn't even guarantee that all the elements are somewhere in memory at a given time. 
It doesn't even necessarily know how many elements there are to be read, or how many elements can be written to it. 
A stream could represent, say, an Internet socket: writing to the stream sends data to a remote host, and reading from the stream receives data. 
Some streams may be read-only, and some might be write-only; some might allow both reading and writing. 
Some streams, such as those providing access to a file on disk, may allow seeking. Others may not.

In theory, all a stream guarantees you is that, at any given time, you can try to fetch the next element from it (reading), or you can try to place another element into it (writing), which will go "after" all the previously inserted elements; possibly both. 

I think there are two ways of looking at this. You are looking at this as "the source of the stream", others are looking at it as "the stream which delivers values from some source".

Both are equally valid, but honestly I prefer AsyncStream. I think it makes sense to de-emphasize "the source of the stream" and in turn give more emphasis to the source of the values.

2 Likes