SE-0314 (Second review): AsyncStream and AsyncThrowingStream

The GitHub page shows that this is still active. What's status on this review? AsyncStream is about the only thing I miss in my project from the mass concurrency update. And I'm thoroughly looking forward for it!

I think the discussion is about whether this is a Stream, or the Source of a stream...?

1 Like

Regarding the name: As a programmer new to Swift concurrency, I may say: "OK. I know about AsyncSequence and how to use it. But how do I create my own AsyncSequence?" From this viewpoint, AsyncStream would not come to mind. It rather sounds like a competing concept to AsyncSequence. AsyncSequenceSource on the other hand, would look like the thing I am looking for. Wanting to create my own AsyncSequence, I may just start to type AsyncSequence into Xcode and the code completion would suggest AsyncSequenceSource, from where I could look up the documentation.

6 Likes

Yeah, I mentioned "AnyAsyncSequence" above by the same logic. Not that the name is great, but there is precedence for it in the standard library.

I think my favorite is still "AsyncSource", though.

I have used the new version of this proposal in RSocket. After using it, I think AsyncStream. Continuation.Termination should be @frozen so we can switch over it without adding a @unkown default case. We may want to do the same for YieldResult or are we expecting new cases in the future?


Maybe unrelated to this proposal but I got the following error when I tried to assign a closure to onTermination:

Converting non-concurrent function value to '@Sendable (AsyncThrowingStream<Payload, Error>.Continuation.Termination) -> Void' may introduce data races

It took me some time to find out that I need to add @Sendable to the closure type.
(Toolchain: swift-5.5-DEVELOPMENT-SNAPSHOT-2021-07-08-a)

WRT the name, I think AsyncStream is the right choice - at least, going beyond a simple description of the type, and considering how it would look to have variables or function arguments with the type name AsyncStream vs AsyncSource, I think -Stream is the more appropriate description.

Whether or not any particular AsyncStream is also "the place where the stream/flow begins" depends on how you look at things. If the stream is fed by another stream, is it truly the source? Let's imagine a stream like the following:

let streamOne = AsyncStream(unfolding: {
  let bytes = await read(fd)
  return await processBytes(bytes)  
})

You could argue that streamOne is a "source" of some data. But what if I wrap streamOne in another stream?

var it = streamOne.makeAsyncIterator()
let streamTwo = AsyncStream(unfolding: {
  guard let streamOneElement = await it.next() else { return nil }
  return await moreProcessing(streamOneElement)
})

What's the "source" of the resulting elements now - streamOne or streamTwo? If we said AsyncStream was always a source, it means the elements you get by consuming streamTwo are part of a different stream to those emitted by streamOne -- even though consuming streamTwo executes and consumes streamOne. It's weird, and ultimately I think it's a meaningless distinction. It isn't relevant to any function which accepts data in the form of an AsyncStream.

1 Like

I‘m torn on the naming conventions here.

I feel like Combine has established a great way for the types names.

extension Publishers {
  struct Map<Upstream, Output> where Upstream : Publisher
}

The error type is extracted from the Upstream type.

Ideally I think we could do the same.

extension AsyncSequences {
  struct Map<Upstream, Output> where Upstream : AsyncSequence
}

As comparison AsyncStream types use Base instead of Upstream and Downstream names. However I‘m not so sure about the base name for AsyncStream itself if it were nested inside AsyncSequences namespace. Combine has also some global publishers which would fit the current situation.

RxSwift allows users to build an Observable from a closure similarly how AsyncStream is proposed.

Swift misses a few features which would allow initializing opaque AsyncSequence directly from the protocol itself.

AsyncSequence { continuation in
  ...
}

// something like
extension AsyncSequence {
  static func callAsFunction(/* closure parameter */) -> some Self
}
1 Like

That closure syntax is cute, but can that actually be pulled off? I could see that as additive later on if we so choose if it is.

I‘m not 100% sure (from the swift user's perspective) and I cannot test it until late tomorrow when I get back from the vacation. At first I had an init extension my mind, but that has other compiler restrictions and cannot be used without the protocol itself requiring an init, which is non-sense already. Then I remembered that we could use callAsFunction, but did it require an attribute of some sort or does it work in static context? I genuinely don't remember the details from top of my head.

To follow-up w/ that - it currently does not work

extension Sequence {
  public static func callAsFunction<T, State>(state: State, next: @escaping (inout State) -> T?) -> UnfoldSequence<T, State> {
    sequence(state: state, next: next)
  }
}

let s = Sequence(state: "Hello") { state in
  return "test"
}

That does not compile sadly; but perhaps a pitch should be put forth to make it be able to? Sounds like something for somebody to take on ;)

1 Like

I think it would definitely fit AsyncSequece and hide the generic AsyncStream name if gets adapted more broadly, but it seems the callAsFunction proposal authors seemed to disagree but not fully reject the idea: swift-evolution/0253-callable.md at 0c2f85b3ae42539a7cd47fca2473a0bf6f345566 · apple/swift-evolution · GitHub

Does anyone know why AsyncThrowingStream's unfolding initializer is not constrained to Failure == Error like its continuation-based initializer? This makes it easy for AsyncThrowingStream to throw any error possible, no?

1 Like

hmm that probably should be... that is perhaps a bug that might restrict us in the future from any potential fallout from more strongly typed function signatures

How about "AsyncProducer"?

One could argue that this is just an AsyncSequence wrapping a producer. But I think it leads to the right understanding of how to use it. Also I think it's a bit unfortunate that AsyncStream and AsyncSequence almost sound the same.

It's quite often that you have a producer and a consumer, and need them to communicate. And this is the tool to reach out for. And it sounds a bit like Combine's Publisher, but it's better :wink:

3 Likes

IMO, I think this will largely be used as a building block for reactive stream libraries (eg. "swift-streams") more than anything, since most people who will be converting their delegate code, etc. to asynchronous streams will most likely want a more fully-featured reactive streams implementation, with debouncing, merging and the like. In addition, such libraries will have time to develop since the adoption of AsyncSequence will be slow due to the iOS 15 barrier. So people won't actually be confronted with raw AsyncStreams much.

Therefore, I wouldn't mind at all if AsyncStream had a more verbose, descriptive and correct name – my favourite of which would be @ktoso 's AsyncSequenceSource – since I see it more as a building block for larger libraries.

4 Likes

How about AsyncSubject? This would fit well with the Combine nomenclature.

1 Like

Would mean little to those coming without combine as a reference… (myself included)

2 Likes

What we are looking for is a name for something that allows sending values as an async sequence with imperativ statements. Combine came up with Subject - I see no reason why not to use this name again.

Except that it’s commonly used to identify a subscription item in pub/sub systems - which has nothing to do with this use case - so it doesn’t (to me) seem like a natural choice. I’ve already expressed my preference above (inline with @ktoso ) - so won’t reiterate. Just want to point out that Subject has a very different established usage for many users who have not used Combine to which it would be misleading.

3 Likes

Has introducing an asyncSequence free function been considered, as a parallel to the sequence free function? Would that be possible or alleviate the bike-shedding of the type itself?

1 Like