I was a bit overloaded recently and missed the review deadline is up here...
Glad this is moving forward!
The latest adjustments based on feedback do address my greatest concerns voiced previously and make the type useful and actually usable, glad to see it.
I'll focus on the topics @Douglas_Gregor requested to focus on then:
Naming
First the good things! All those are very nice:
- the
bufferingOldest
, bufferingNewest
, and unbounded
names are very good!
- It's the inverse of how I'm used to think about it but it's a great phrasing and actually easier to understand the behavior probably, +1 on those.
- the
enqueued(remaining)
is good, though making any use of the "remaining" value is likely to be racy, to be honest I'd drop that from the API and just do enqueued
but I can see for a single producer workflow this may be fun to utilize and build an efficient prefetch perhaps.
- Question:
init(unfolding:onCancel:)
is nice, though shouldn't the produce
be allowed to throw
as well?
The actual type: AsyncSequenceSource
I feel very strongly that calling this a "(Async)Stream" is a very bad idea, because it isn't "the stream".
It is a way to "generate" the "beginning" (source, publisher, producer, origin...) of a stream, and I'm glad the naming is being asked for feedback rather than steamrolling ahead with it
In the past I spent a long time naming such exact type in the past and I really do think it captured the best and most important semantics in the name so I'll bring it up here again.
The type's primary goal in life is:
- bridge "non stream" / "non async" / "non back-pressure aware" APIs with the
AsyncSequence
- as such the name should imply it is closely related to the async sequence
The type inherently behaves like a "queue":
- it is either bounded or unbounded, just like a queue
- we offer elements to it from "the outside", from any thread, just like a queue
- if it is bounded, it may drop elements, just like a bounded queue
As such, I really think adopting similar naming would be beneficial. At the same time, it gets tricky with too many concepts in the name and "AsyncSequenceSourceQueue
" while very precise, I guess would not be met with much love.
I think we could drop the Queue
word and focus on the other important part of this API:
- this type is NOT just some arbitrary part of a stream, it is always the "beginning", the source, of the stream.
As such, I think we should combine these two pieces of information into: AsyncSequence
+ Source
= AsyncSequenceSource
.
It makes it incredibly clear that:
- this is used to "begin" async sequences,
- there may be various kinds of sources -- we already have two:
unfold
and the continuation-based
one, there can be other types of async sequence sources, such as timers etc which may be useful to add here in the future.
The API would end up reading like this:
AsyncSequenceSource(unfold: ...)
OR AsyncSequenceSource.unfold { ... }
AsyncSequenceSource(of: Int.self, .bufferingNewest(10)) { c in ... }
- notice the
of: ...
which is to align the API shape with TaskGroup(of: Int.self)
which we have today
And it is easy to talk about -- "use a sequence source" etc. Pushback against this name has been because "other sources exist" but I don't think this is a good reason, because we're designing core types for swift concurrency here, and these are what will be showing up more and more, and not less and less, so they deserve to have good names.
As someone who worked on the RS specification I'll chime in to this a little bit. While the specification is called Reactive Streams, none of the implementations make use of the word "Stream" in their APIs, this is for good reason: "the stream" is the composition of many stages/elements/operators, and as such no single operator/stage/element is "a stream" but rather a Source
, Flow
, or Sink
(alternatively a Publisher
, Processor
and Subscriber
, or even Producer
, Processor
, and Consumer
(trivia, these used to be the original names )). And if a bunch of them get together and actually run -- that's the stream.
In the same way as tracing systems never have a "trace" object but have "spans" and a "bunch of spans" interconnected using specific rules is the "distributed trace".
While still on the naming theme, and not challanging any of the functionality but only the naming inconsistencies. These are small but annoying inconsistencies that would be nice to iron out...
"yield" -> offer
or push
I have some issues with "Yield
" in general though, and as such with YieldResult
as a type name... these are not "yield" operations to me. I've never seen an API where a "yield" can fail, he this is exactly what this is.
This operation is exactly what is typical with bounded queue implementations and usually named "push
" or "offer
". I especially like offer
because it expresses prefectly what this operation is, and that such offer may be rejected (if no buffer space is available). This is an important feature of the API and the name should reflect it.
The type name YieldResult
would be adjustec accordingly, OfferResult
or Offered.dropped
(Offered.enqueued
) or similar, allowing for:
switch source.offer(...) {
case .terminated:
case .dropped(...):
case .enqueued(...):
Looking at this one realizes that we're mixing terminology here again... and neither the "yield" or "offer" make sense to reply with "enqueued" (!).
The result type therefore to be consistent in terminology used could be:
enum ElementOffer {
case accepted(remaining: Int)
case dropped(Element)
case terminated
}
which would be a bit cleaner if we're to stick with offer()
it is normal to "accept an offer" after all.
If we went with "push()
" the enqueued
makes sense again because we're back in "queue terminology" IMHO.
Minor: finish()
-> complete()
I'm very weirded out by the "finish" to be honest, it's not a hill worth dying on I guess but it's very weird to me, rather this could be more similar to completion where completion is one of: successul, failed, cancelled.
as @DevAndArtist has called out above in the thread.
Nitpicks while here, not a hill I'll die on but still worth pointing out:
BufferingPolicy.unbounded
is missing documentation.
- I would suggest adding documentation suggesting to not use this value as it can easily lead to explosive, unlimited, memory usage growth.
- In apps that'll just kill the app, but it is traumatic for server systems where one such bad/silly stream growing forever would OOM and crash the entire server.
bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded,
I said this in prior reviews and dicussions, unbounded default is IMHO a bad idea – by default we disabled any kind of backpressure in this type basically by doing this – but if people really like it so be it.