SE-0314 (Second review): AsyncStream and AsyncThrowingStream

This may be one of those “various people got attached to some meaning of a word and can’t agree now”… :wink: But your argument is weird here: the “what” powers the source does not matter at all, it’s some callback, some iterator, whatever.

The actual source of the stream—meaning “the place where the stream/flow begins” is this type, because it is the first element that is part of the stream. It is the first element in the stream that must adhere to e.g. flow control and really begins the entire chain of (potential) operations. Anything outside of it is clearly not part of the stream, and as such calling the “outside of the stream” pieces a source is very weird, IMHO.

6 Likes

Another way to express that, is that it is indeed the source of the Sequence.

Or that a GCD dispatch source timer isn’t the true source of time, but the generator of the timer events.

I see it as a factory, so I like source and generator better, but:

AsyncSequenceFactory

For the naming, Stream itself is NOT the source, it is much like - function of data source, which similar to View - is function of states; in other words, View is a functional representation of state and Stream is a functional representation of data source. Furthermore, streams can be pipelined/composited together like functions do.

AsyncStream, AsyncFlow, or AsyncPipe should be reasonable candidates for this programming paradigm; since it's compatible with well-known functional reactive stream programming conception.

For ref links:

https://www.quora.com/What-are-streams-in-programming


What is a stream?
A stream is a kind of object used to denote sources and sinks of data in most high-level programming languages, including C++. 
The stream abstraction is most commonly used to provide an interface to files, including standard input and standard output.

Unlike a vector, a stream does not guarantee that you can read or write any element at any time. 
Unlike a list, a stream doesn't even guarantee that all the elements are somewhere in memory at a given time. 
It doesn't even necessarily know how many elements there are to be read, or how many elements can be written to it. 
A stream could represent, say, an Internet socket: writing to the stream sends data to a remote host, and reading from the stream receives data. 
Some streams may be read-only, and some might be write-only; some might allow both reading and writing. 
Some streams, such as those providing access to a file on disk, may allow seeking. Others may not.

In theory, all a stream guarantees you is that, at any given time, you can try to fetch the next element from it (reading), or you can try to place another element into it (writing), which will go "after" all the previously inserted elements; possibly both. 

I think there are two ways of looking at this. You are looking at this as "the source of the stream", others are looking at it as "the stream which delivers values from some source".

Both are equally valid, but honestly I prefer AsyncStream. I think it makes sense to de-emphasize "the source of the stream" and in turn give more emphasis to the source of the values.

2 Likes

I know I’m late to the review. Generally like it, don’t have strong feelings about the naming.

I was looking at the implementation and note that the doc comment says multiple concurrent calls to next() will trigger a fatal error (although I couldn’t actually see fatal error being called) but I couldn’t see anything in the proposal either for this or for AsyncSequence stating that concurrent calls to next() is forbidden.

There may be valid use cases but concurrent next calls would definitely complicate implementation. If I haven’t just missed it should explicitly mentioned in the proposals myself or the reverse.

Will any of those Stdlib vended AsyncSequences provide any way to “append” values to them? If no how could we use them to yield new elements after initialization/declaration? Storing the continuation provided in the init closure is valid?

Forget about it just saw the wwdc video regarding that

Yes, unlike other continuations the continuation in the construction closure is capturable and sendable.

Cool, I'm thinking about creating some kind of AsyncSequence in my pet project similar to Rx Subjects that provide onNext, onCompleted and onError methods and which allows publishing elements/event in the sequence

The GitHub page shows that this is still active. What's status on this review? AsyncStream is about the only thing I miss in my project from the mass concurrency update. And I'm thoroughly looking forward for it!

I think the discussion is about whether this is a Stream, or the Source of a stream...?

1 Like

Regarding the name: As a programmer new to Swift concurrency, I may say: "OK. I know about AsyncSequence and how to use it. But how do I create my own AsyncSequence?" From this viewpoint, AsyncStream would not come to mind. It rather sounds like a competing concept to AsyncSequence. AsyncSequenceSource on the other hand, would look like the thing I am looking for. Wanting to create my own AsyncSequence, I may just start to type AsyncSequence into Xcode and the code completion would suggest AsyncSequenceSource, from where I could look up the documentation.

6 Likes

Yeah, I mentioned "AnyAsyncSequence" above by the same logic. Not that the name is great, but there is precedence for it in the standard library.

I think my favorite is still "AsyncSource", though.

I have used the new version of this proposal in RSocket. After using it, I think AsyncStream. Continuation.Termination should be @frozen so we can switch over it without adding a @unkown default case. We may want to do the same for YieldResult or are we expecting new cases in the future?


Maybe unrelated to this proposal but I got the following error when I tried to assign a closure to onTermination:

Converting non-concurrent function value to '@Sendable (AsyncThrowingStream<Payload, Error>.Continuation.Termination) -> Void' may introduce data races

It took me some time to find out that I need to add @Sendable to the closure type.
(Toolchain: swift-5.5-DEVELOPMENT-SNAPSHOT-2021-07-08-a)

WRT the name, I think AsyncStream is the right choice - at least, going beyond a simple description of the type, and considering how it would look to have variables or function arguments with the type name AsyncStream vs AsyncSource, I think -Stream is the more appropriate description.

Whether or not any particular AsyncStream is also "the place where the stream/flow begins" depends on how you look at things. If the stream is fed by another stream, is it truly the source? Let's imagine a stream like the following:

let streamOne = AsyncStream(unfolding: {
  let bytes = await read(fd)
  return await processBytes(bytes)  
})

You could argue that streamOne is a "source" of some data. But what if I wrap streamOne in another stream?

var it = streamOne.makeAsyncIterator()
let streamTwo = AsyncStream(unfolding: {
  guard let streamOneElement = await it.next() else { return nil }
  return await moreProcessing(streamOneElement)
})

What's the "source" of the resulting elements now - streamOne or streamTwo? If we said AsyncStream was always a source, it means the elements you get by consuming streamTwo are part of a different stream to those emitted by streamOne -- even though consuming streamTwo executes and consumes streamOne. It's weird, and ultimately I think it's a meaningless distinction. It isn't relevant to any function which accepts data in the form of an AsyncStream.

1 Like

I‘m torn on the naming conventions here.

I feel like Combine has established a great way for the types names.

extension Publishers {
  struct Map<Upstream, Output> where Upstream : Publisher
}

The error type is extracted from the Upstream type.

Ideally I think we could do the same.

extension AsyncSequences {
  struct Map<Upstream, Output> where Upstream : AsyncSequence
}

As comparison AsyncStream types use Base instead of Upstream and Downstream names. However I‘m not so sure about the base name for AsyncStream itself if it were nested inside AsyncSequences namespace. Combine has also some global publishers which would fit the current situation.

RxSwift allows users to build an Observable from a closure similarly how AsyncStream is proposed.

Swift misses a few features which would allow initializing opaque AsyncSequence directly from the protocol itself.

AsyncSequence { continuation in
  ...
}

// something like
extension AsyncSequence {
  static func callAsFunction(/* closure parameter */) -> some Self
}
1 Like

That closure syntax is cute, but can that actually be pulled off? I could see that as additive later on if we so choose if it is.

I‘m not 100% sure (from the swift user's perspective) and I cannot test it until late tomorrow when I get back from the vacation. At first I had an init extension my mind, but that has other compiler restrictions and cannot be used without the protocol itself requiring an init, which is non-sense already. Then I remembered that we could use callAsFunction, but did it require an attribute of some sort or does it work in static context? I genuinely don't remember the details from top of my head.

To follow-up w/ that - it currently does not work

extension Sequence {
  public static func callAsFunction<T, State>(state: State, next: @escaping (inout State) -> T?) -> UnfoldSequence<T, State> {
    sequence(state: state, next: next)
  }
}

let s = Sequence(state: "Hello") { state in
  return "test"
}

That does not compile sadly; but perhaps a pitch should be put forth to make it be able to? Sounds like something for somebody to take on ;)

1 Like

I think it would definitely fit AsyncSequece and hide the generic AsyncStream name if gets adapted more broadly, but it seems the callAsFunction proposal authors seemed to disagree but not fully reject the idea: swift-evolution/0253-callable.md at 0c2f85b3ae42539a7cd47fca2473a0bf6f345566 · apple/swift-evolution · GitHub