[Pitch] Convenience Async[Throwing]Stream.makeStream methods

Hello everyone,

I would like to pitch a small addition to Async[Throwing]Stream to make their usage slightly more convenient. Would love to get everyones opinion on this!

Convenience Async[Throwing]Stream.makeStream methods

  • Proposal: SE-NNNN
  • Authors: Franz Busch
  • Review Manager: TBD
  • Status: Awaiting implementation

Introduction

With SE-0314
we introduced AsyncStream and AsyncThrowingStream which act as a source
AsyncSequence that the standard library offers.

Motivation

After having used Async[Throwing]Stream for some time, a common usage
is to pass the continuation and the Async[Throwing]Stream to different places.
This requires escaping the Async[Throwing]Stream.Continuation out of
the closure that is passed to the initialiser.
Escaping the continuation is slightly inconvenient since it requires a dance
around an implicitly unwrapped optional.

Proposed solution

In order to fill this gap, I propose to add a new static method makeStream on
AsyncStream and AsyncThrowingStream that returns both the stream
and the continuation.

Detailed design

I propose to add the following code to AsyncStream and AsyncThrowingStream
respectively.

extension AsyncStream {
  /// Initializes a new ``AsyncStream`` and an ``AsyncStream/Continuation``.
  ///
  /// - Parameters:
  ///   - elementType: The element type of the sequence.
  ///   - limit: The buffering policy that the stream should use.
  /// - Returns: A tuple which contains the stream and its continuation.
  @_alwaysEmitIntoClient
  public static func makeStream(
      of elementType: Element.Type = Element.self,
      bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded
  ) -> (stream: AsyncStream<Element>, continuation: AsyncStream<Element>.Continuation) {
    let storage: _Storage = .create(limit)
    let stream = AsyncStream<Element>(storage: storage)
    let continuation = Continuation(storage)
    return (stream: stream, continuation: continuation)
  }

  @_alwaysEmitIntoClient
  init(storage: _Storage) {
    self.context = _Context(storage: storage, produce: storage.next)
  }
}

extension AsyncThrowingStream {
  /// Initializes a new ``AsyncThrowingStream`` and an ``AsyncThrowingStream/Continuation``.
  ///
  /// - Parameters:
  ///   - elementType: The element type of the sequence.
  ///   - failureType: The failure type of the stream.
  ///   - limit: The buffering policy that the stream should use.
  /// - Returns: A tuple which contains the stream and its continuation.
  @_alwaysEmitIntoClient
  public static func makeStream(
      of elementType: Element.Type = Element.self,
      throwing failureType: Failure.Type = Failure.self,
      bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded
  ) -> (stream: AsyncThrowingStream<Element, Failure>, continuation: AsyncThrowingStream<Element, Failure>.Continuation) {
    let storage: _Storage = .create(limit)
    let stream = AsyncThrowingStream<Element, Failure>(storage: storage)
    let continuation = Continuation(storage)
    return (stream: stream, continuation: continuation)
  }

  @_alwaysEmitIntoClient
  init(storage: _Storage) {
    self.context = _Context(storage: storage, produce: storage.next)
  }
}

Source compatibility, Effect on ABI stability, Effect on API resilience

As this is an additive change, it should not have any compatibility, stability or resilience problems. The only potential problem would be if someone has already run into this shortcoming and decided to define their own makeStream methods.

Alternatives considered

Return a concrete type instead of a tuple

My initial proposal was using a concrete type as the return paramter of the factory;
however, I walked back on it since back deployment issues were raised with introducing a new type.

I still believe that there is value in providing a concrete type since
it is easier to handle than a tuple and documentation can be provided in a nice way.

extension AsyncStream {
  /// Simple struct for the return type of ``AsyncStream/makeStream(elementType:)``.
  public struct NewStream {
    /// The actual stream.
    public let stream: AsyncStream<Element>
    /// The continuation of the stream
    public let continuation: AsyncStream<Element>.Continuation

    @inlinable
    internal init(
        stream: AsyncStream<Element>,
        continuation: AsyncStream<Element>.Continuation
    ) {
        self.stream = stream
        self.continuation = continuation
    }
  }

Do nothing alternative

We could just leave the current creation of Async[Throwing]Stream as is;
however, since it is part of the standard library we should provide
a better method to create a stream and its continuation.

22 Likes

Great improvement -- the shape of the AsyncStream type is very awkward and telling people that it's ok to escape THAT SPECIFIC continuation but DO NOT escape continuations in general has been very weird. Nice improvement, definitely +1 :+1:

I really do think the Alternatives Considered listed NewStream is a much better return type though, rather than the tuple. In case we'd want to do something else there in the future; and also it gives us a place to put documentation on, which we cannot do with the tuple, so I'd really argue for that API shape rather than the tuple.

Other than that, great idea and I hope we can get this in :+1:

13 Likes

I personally agree that a concrete return type is nicer. I would love to hear more opinions on this topic especially from the Core/Language workgroup. We have opted for a concrete type in our NIO root AsyncSequence as well: swift-nio/NIOAsyncSequenceProducer.swift at 6c31c9a46c89f0a1653d932a8d6b3454b1817468 ยท apple/swift-nio ยท GitHub

3 Likes

That'd be awesome, we've done this too internally with the tuple approach thus:

internal func makeStreamAndContinuation<T>(for _: T.Type) -> (AsyncStream<T>, AsyncStream<T>.Continuation) {
    var resultContinuation: AsyncStream<T>.Continuation?
    let asyncStream = AsyncStream<T> { continuation in
        resultContinuation = continuation
    }

    guard let resultContinuation = resultContinuation else {
        fatalError("makeStreamAndContinuation internal error, couldn't extract resultContinuation")
    }
    return (asyncStream, resultContinuation)
}

A concrete return type would be nice (for the documentation as already noted).

3 Likes

Big +1 on this. Have my own utility that is of the tuple variety, so will be happy to git rid of that! :slight_smile:

I'm also of the opinion that a concrete type would be nicer, but then the NewStream type and makeStream name and need updating to reflect that you'll be getting a stream/continuation pair rather than a stream. AsyncStream.makeStream says to me I'll be getting an AsyncStream not something that wraps an AsyncStream.

Something along the lines of makeStreamContinuationPair and StreamContinuationPair.

Or: a top-level type that makes it even clearer. Rough outline something along lines of:

struct AsyncPipe<Element> {
  typealias Continuation = AsyncStream<Element>.Continuation
  private let continuation: Continuation
  init(
    of elementType: Element.Type = Element.self,
    bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded
  ) { ... }
  var stream: AsyncStream<Element> { ... }
  func yield(_ element: Element) -> Continuation.YieldResult { continuation.yield(element) }
  /* other yield methods */
}
1 Like

Thanks for the feedback!

Naming for the method and the type can be definitely bike-shedded!

However, I have to disagree on creating a top-level type that wraps both. Especially, if we shadow the yield method on it. One thing that users of AsyncStream should do, if possible, is passing the actual AsyncStream and the AsyncStream.Continuation to two different places, i.e. the producer and consumer. This allows that consumers/producers only get the interface that is necessary for them. Furthermore, it could allow to track if there is still a producer/consumer holding onto their respective types an terminating the sequence if nobody is left (We are doing this in NIO).

3 Likes

Got it โ€“ makes sense. I had no idea an AsyncSequence would be terminated if its continuation was released. That's pretty neat!

2 Likes

This can't be generalised. AsyncStream is only terminating right now if both sides are dropped as far as I know. The NIOAsyncSequenceProducer terminates when either the consumer is dropped or the Source is dropped and there are no left over elements.

We opted to do this since it allows us to stop the underlying source of production as early as possible.

I'd be happy to see this in the standard library as it's really handy for testing. We maintain a version of this that we call streamWithContinuation. We also decided to return a tuple to be able to leverage destructuring in the situations where it's not necessary to give a full name to the pieces (usually in tests):

let (stream, continuation) = AsyncStream<Void>.streamWithContinuation()

And you can always not destructure and instead use the tuple field names:

let buttonTapped = AsyncStream<Void>.streamWithContinuation()
buttonTapped.stream
buttonTapped.continuation

We've used this operation quite a bit in practice, and I can't say that we've ever passed the stream+continuation around at the same time, hence using a tuple was never really a problem.

It also seemed like there was somewhat of a precedence of using lightweight tuples in the standard library, such as in sequence.

But I also think a dedicated type would work just fine. The ability to destructure is not the most important aspect of this operation.

10 Likes

It would be really cool feature! +1!

I think returning a concrete type is nicer than using tuple. IMHO it give a freedom to store stream and continuation together or pass them to other functions / initialisers separately.

Agree with @ktoso point about documentation.

4 Likes

Big +1 from me :slight_smile:

Sounds like many of us have implemented our own version of this. I have my own personal AsyncStream type that wraps (and shadows) _Concurrency.AsyncStream, but gives direct access to the continuation as a property of the stream. Of course I don't recommend my approach in general, since it can be rather confusing for other people reading the code that don't realize it isn't the real AsyncStream :sweat_smile:.

+1

Thanks for the pitch.

An alternate name could be AsyncStream.pipe(). ReactiveSwift has this kind of naming I believe. It also resonates in the Unix world.

This is perfectly possible with a tuple, you don't have to destructure it and are free to store as an aggregate value in a single constant/variable.

let (stream, continuation) = AsyncStream.makeStream(...)
let aggregate = AsyncStream.makeStream(...)

OTOH, destructuring is not available for a concrete type and the first line in the snippet above would not compile.

5 Likes

At this point, I really would like to get somebody from the @language-workgroup to chime in and state what they prefer both in terms of usability and back-deployment.

1 Like

It's a bit premature to be pinging the language workgroup for this change after 5 days of pitch over a weekend.

Something that might help is to refer to the test criteria for standard library additions that have been laid out in the past: is it non-trivially composed, helps avoid common pitfalls, significantly improves readability. This does meet these criteria AFAICT.

If the pitch is achieving consensus after a while, and the proposal is easy to implement, the next step is to put up an evolution proposal and an implementation as PRs against the appropriate repos. But I think the pitch needs longer to get to that stage.

Two concerns from me (not language group feedback, just my own):

  • The shape of this API is unusual. We don't normally put creation static methods on protocols. Usually initializers on concrete types are preferred. The need for an Element meta type in the signature is also a sign something is off here.

  • I'd like to see some scrutiny/discussion of the safety aspects of this design. Can it be misused in a way that leads to leaks or unexpected behavior? Personally I'm not familiar with the domain to be confident but maybe others can chime in (@Philippe_Hausler maybe)

Back deployment isn't something the language group generally comments on โ€“ that is a decision for the platform owner not evolution. But since this doesn't declare any new types, it can probably make use of the currently-in-review @backDeploy feature.

4 Likes

Thanks for chiming in here and sorry for the early ping!

We are in agreement here, I just misread your point. Sorry about that!
I would disagree on this point. I think this avoids a very common pitfall which is that it is not really clear to users that you can escape the Continuation from the closure based init. Much less is it clear that it safe to do so. Just from the feedback here in this thread in 5 days we see that a lot of people have created conveniences for this.

This proposal is not about adding static methods on any protocols. It is rather about extending the concrete types AsyncStream and AsyncThrowingStream with these two new convenience static methods. We cannot use an init for this since we need to return two separate types. The Element type in the parameters of the makeStream() method is just to make it easier for users to spell it and follows our spellings for task groups.

let (stream, continuation) = AsyncStream<Int>.makeStream()

can become this:

let (stream, continuation) = AsyncStream.makeStream(of: Int.self)

This is one of the goals that this proposal wants to achieve. It should be clear that actually escaping the Continuation is safe and with this new API we make this very clear. I have already talked with @Philippe_Hausler about this and would love if he can chime in here as well!

This is good to know since one of the open discussion points is a new concrete type vs the proposed tuple approach. I have read in a few places already that concrete types can't be back deployed and from your statement I guess that still stands. That would make the tuple more favourable in my opinion since this kind of convenience benefits a lot from being backdeployable.

4 Likes

AFAICT Ben said it fulfills those criteria so seems you are in agreement.

1 Like

Oh you are right, I must have imagined a not in there. Thanks for clarifying that!

Since you mention that an alternative is a concrete wrapper type with both stream and continuation as properties, then would it be the case then that you could use init instead of a static creation method with that alternative?

Since platform owners and not the Swift Evolution process control what is back-deployed, I would be very hesitant to compromise on the proposed design of a feature on the basis of what can be back-deployed. It would set a very poor precedent to contort the design of permanent additions to the standard library on the basis of transitional bring-up matters that are not even within the full control of the Swift project.

3 Likes

Yeah, we could but that is super hard to discover in my opinion. It would end up being something like this:

let newStream = AsyncStream<Int>.StreamWithContinuation()

I will add it to the alternatives considered section though since it is worth mentioning!

I think that's a fair point! Thanks for bringing it up.

I went with the tuple because I heard concerns about backdeployability of the concrete type but I agree that we shouldn't compromise because of individual platforms ability here.