Swift Async Algorithms Proposal: AsyncBackPressuredStream

Hello everyone,

I would like to propose my previous SE-0406 BackPressue support for AsyncStream as an inclusion to swift-async-algorithms. You can find the full revised proposal and implementation in my PR.

The important changes to the SE proposal are:

  • Introduce a new Async[NonThrowing]BackPressuredStream type in AsyncAlgorithms instead of adding new factory methods to Async[Throwing]Stream.
  • Add custom element size calculation for the watermark strategy. This is useful when the Element of the stream is a collection and allows to take the count of the collection elements into consideration for the watermarks.

Would love to hear what people think about the inclusion and the changes!

Franz

20 Likes

Excited to see this resurface again!

This is super useful for us, good addition!

2 Likes

I agree that while SE did return this for revision, this is very much needed anyway and good to include in the async-algs library where it can be evolved more easily if the need were to arise. I think the primary SE feedback was along those lines of concern about handling all possible cases or not etc.

4 Likes

I think it is a great idea to add this the async algorithms package!

These building blocks for "correct" async data flow are easy to get wrong, so it is great to have an Apple-backed, central place for their implementations - with this one being an absolute essential in my eyes.

While I agree with the SE pitch about the unfortunate state of AsyncStream in the stdlib, I might actually prefer the package approach more. To me the freedom to evolve and adjust this in its own pace outweighs the (very manageable) downsides of depending on one (tightly controlled and versioned) package.

2 Likes

Given that the use case for a custom element size watermarks extremely often would just be the count of collections, would it make sense to have a convenience extension for collections?

E.g. something like:

/// A backpressure strategy using a high and low watermark to suspend and resume production respectively.
///
/// - Parameters:
///   - low: When the number of buffered elements drops below the low watermark, producers will be resumed.
///   - high: When the number of buffered elements rises above the high watermark, producers will be suspended.
///   - waterLevelForCollectionCount: True if we should use the count of the collection element to the current water level,
///   otherwise we will use the number of collections as water level
///
extension AsyncNonThrowingBackPressuredStream.Source.BackPressureStrategy where Element: Collection {
    public static func watermark(
        low: Int,
        high: Int,
        waterLevelForCollectionCount: Bool) -> AsyncNonThrowingBackPressuredStream.Source.BackPressureStrategy {
            if waterLevelForCollectionCount {
                return .init(
                    internalBackPressureStrategy: .watermark(
                        .init(low: low, high: high, waterLevelForElement: { $0.count })
                    )
                )
            }
            return .watermark(low: low, high: high)
        }
}

The functionality of these seems like a nice wrap-up of some of the original parts of the AsyncStream pitch/review cycle that didn't get a follow-on update. So to that part of it I say it looks great and fits quite nicely into the rest of the algorithmic side of things.

However the names are in need of some elegance in my view (I know... opening up the bike shed discussion...). Particularly these are types representing a custom or external back pressure system. By nature all AsyncSequences handle backpressure in some way or another. The special part about these in my view is that it exposes that in a cohesive way as an externally connectable system. So in my view it really should have the full name of AsyncExternalBackpressureStream or something along those lines to re-enforce the concept that it is intended to be used as that external connection.

Also along the lines of naming; l the NonThrowing is an odd duck compared to other things currently existing. I would guess your reasoning is to name it nicely for a transition to typed throws. But once that lands is it the intention to deprecate the NonThrowing version and refine the bare named one accordingly?

Since typed throws is coming along nicely perhaps we can just avoid the non-throwing variant for now all together and not worry about the churn. Since trying on a non throwing function (even iteration) should "just work" (with a warning).

2 Likes

I initially had this overload but decided against adding it since count on Collection might be O(n) and calculating the watermark must be fast since it runs while we are holding the lock. Hence, I decided not to provide any conveniences here and let the user explicitly state how the watermark is calculated.

I am not tied to the AsyncBackpressuredStream name and more than happy to add External into the name.

Yes the intention is to deprecated the NonThrowing type and just use the bare named one. After working on the Typed throws in the Concurrency module proposal I realised that our naming decision in the stdlib is making the transition to typed throws more complicated so I wanted to avoid that for this proposal.

Typed throws are coming along nicely but realistically they will require one of the next Swift releases. I don't want to tie this proposal to a new Swift version since those types can be happily used in older Swift versions as well except the awkward NonThrowing name.

3 Likes

Perhaps it’s incorrect, but don’t you think most collections used in this context would have a time complexity of O(1) for count? For those that don’t, one could always override the convenience default?