Kickoff of a new season of development for AsyncAlgorithms; Share

In general, I'm exited to see this moving forward. I agree with the concerns around documenting the exact behavior that @tcldr brought up.

I don't think this is 100% accurate. The proposed share algorithm does have back pressure depending on what buffer strategy you use.

Since you are proposed to re-use the existing AsyncBufferSequencePolicy, there is one policy missing which is the bounded one. This is essentially the "slowest consumer dictates the speed" policy in the share case. The internal buffer will accumulate elements until it reaches the bound and then the slowest consumer dictates when the next element is requested from the upstream.

I don't think unbounded is the right default here generally speaking. For server use-cases this is almost never the right choice. I would propose that we drop the default value here and force users to make a choice. This makes it easy to grep and lint for. I am appreciative to the progressive disclosure argument but this is a general API for all of Swift's use-cases and it shouldn't favor one over the other.

Why does it need to be a detached task and not just an unstructured task? Task {} should be enough here really. In general, it is sad that we to use an unstructured task here at all but the current shape of the AsyncSequence protocol is not compatible with Swift's structured concurrency. We already had to resort to unstructured task's in merge and other algorithms. Hopefully this is something we can address in the future.

FWIW, I would like to see such a protocol as well though shaped a bit differently and using the latest language features. Though I consider this outside the scope of this proposal. My current prototype looks like this:

protocol AsyncWriter: ~Copyable, ~Escapable {
  associatedtype Element: ~Copyable, ~Escapable
  associatedtype Failure: Error

  func write(_ elements: Span<Element>) async throws(Failure)
}

My proposed MultiProducerSingleConsumerAsyncChannel is related but orthogonal. It provides a channel that starts an async sequence but only allows a single consumer. It would compose with the proposed share so that you can effectively get a multi-producer-mulit-consumer async sequence.

In my personal opinion, I think it is okay if the initial algorithm only starts with the proposed cancellation behaviour only if all conumsers and any source of consumer creation went away that the upstream iteration is cancelled. We can always expand this in the future but this seems to me like it is what 95% of the use-cases want.

5 Likes

Ahh… ok… that makes more sense. I wasn't clear that methods added in an extension followed different rules about specialization. So the idea is we add a member method through an extension when product engineers compile against AsyncAlgorithms… but any share methods defined directly on concrete types would not get called through a generic context over AsyncSequence.

I guess it would also be possible to ship share as a free function that takes an AsyncSequence as a parameter… but I have not thought through all the pros and cons of that idea vs a member method defined in an extension.

So the idea is we add a member method through an extension

Exactly. It's a normal member function on AsyncSequence — statically bound and not dispatched through a witness table.

when product engineers compile against AsyncAlgorithms...

That's supposed to be the standard case.

any share methods defined directly on concrete types...

... would shadow the method in the AsyncSequence extension.

I guess it would also be possible to ship share as a free function

As you said, in this case the AsyncSequence must be provided as an explicit argument. A member function feels more natural.

1 Like

My overall concern is the following: up to this point, any cancellation of an AsyncSequence has only either local (the looping task itself got cancelled) or upstream-to-downstream (upstream finished or errored) effects.

If there's a cancellation mode like .cancelOnFirstDownstreamCancel (which I'd also implement as a similar enum if I didn't have this concern btw, no objections here), we're getting another option: any downstream consumer can asymetrically and unequivocally decide whether any other independent consumer succeeds or not.

This can be really tricky to reason about if (as it currently stands) share in either mode only returns some AsyncSequence, especially over API boundaries — a client of a library who can only see some AsyncSequence now has to be permanently paranoid whether cancelling a task is "safe" in the sense of whether it will or will not globally affect any other downstream tasks of a program. A lot of existing code would likely concider cancelling looping tasks safe in this regard too, so if a dependency updates to new AsyncAlgorithms and implements this behind-the-scenes, it could fail a lot of code.

4 Likes

Per the discussion around cancellation; it is worth noting that the behavior is that sides being cancelled only terminate that particular side and do not forward to the upstream sequence, because that would terminate a non-related side from iterating.

func iterateInTwoTasks(_ upstream: some AsyncSequence) -> some AsyncSequence & Sendable {
  let shared = upstream.share()
  let t1 = Task {
     for await element in shared {
       print("t1 \(element)")
     }
  }
  let t2 = Task {
    for await element in shared {
       print("t2 \(element)")
    }
  }
  t1.cancel()
  return shared
}

So given that example: we would not want t2 to stop just because t1 got cancelled. So it cannot "forward" to the backing iteration of the upstream. It would be expected that there will not be as many t1 elements printed as t2 elements (and perhaps may not print any t1 elements).

That being said however, if the shared return value is living it may be that someone might come along and add a "t3" to iterate it also cannot be the case that cancellation of both t1 and t2 cause an upstream cancellation since a new "side" may be added at any point in time.

Currently however if t1 is cancelled, and t2 is cancelled AND there is no longer any lifetime attached to shared it will cancel because no future values could ever be shared and it should clean up the memory associated with it. Not doing so would end up meaning a potential leak.

5 Likes

FWIW, I think this is the right cancellation behavior for this algorithm. Anything else where cancellation of one side influences the cancellation of another can lead to unexpected behavior in sometimes unrelated places.

1 Like

Yes and no, it can totally be a behavior you actively want — if you’re processing some frames in parallel and you need all the frames to be successful, it absolutely makes sense for one failure to cancel the whole stream. It is very much use case dependent.

Not cancelling upstream and siblings is the right default, but it is not the only useful behavior a famous operation can have. Both make sense in certain situations.

7 Likes

I agree that what you describe would make a lot of sense, and I would appreciate the possibility of changing the cancellation policy.

But the current share operator's goal is to share the same data to every consumer for independant processing (e.g., one updates the UI, the other logs changes, the last one backs up data over the network). These are unrelated operations. You typically don't want them interacting.

Your own example (each consummer processing a single frame in parallel) wouldn't be a good use case of the current share operator, since we would need additional synchronization to ensure each frame is process exactly once.

But that's something to keep in mind though. If we later have another operator that "load-balance", cancelling one could be intended to cancel all. However, this would also make recovery/retry impossible. So, yes that might be a good default, but we'd still want flexibility.

With the addition of the .bounded(N) option to the buffering we kinda have to have a cancellation option. Because if someone where to set that to say 1. It would then have an expected behavior such that all sides must have been resumed with a value before progressing to the next (which previously I would have claimed as a seperate algorithm, but it seems that can be currently encapsulated with .share(bufferingPolicy: .bounded(1)) where the buffer cannot have 1 or more values.

/// A policy for buffering elements until the limit is reached.
/// Then consumption of the upstream AsyncSequence will be paused until elements are consumed from the buffer.
/// If the limit is zero then no buffering policy is applied.

So that means that if the buffering policy was set to a bound of 1 it would then mean that the sides are acting as if they are participating in a broadcasting behavior. That means that the side progression would then entail joint influence to each other. Meaning that when the buffer is set to a certain mode, it would possibly mean a desired mutual cancellation.

If we are still to have other sharing operations such as replay etc in the future, we should probably make a AsyncSharedSequenceCancellationPolicy parameter; changing the signature to be:

public func share(bufferingPolicy: AsyncBufferSequencePolicy, cancellationPolicy: AsyncSharedSequenceCancellationPolicy) -> some AsyncSequence<Element, Failure> & Sendable

And the policy to be defined something like:

public struct AsyncSharedSequenceCancellationPolicy: Sendable {
  // Cancels on the first cancellation issued to any side during iteration
  public static var firstCancellation: Self { get }

  // All cancellation of sides do not forward to the base iteration
  public static var suppress: Self { get }

  // Cancel on the first iterated termination such that other sides that might be behind or not yet iterated get a terminal event immediately
  public static var firstIteratedTermination: Self { get }

  // Cancel the base iteration when no more sides can be created and no sides are left iterating
  public static var lifetime: Self { get }
}

And that would then make the current signature have a default of .lifetime.

1 Like

I am not sure I follow that reasoning. Just because there is a bounded buffer does not mean that consumer are influencing each other's cancellation. The bounded buffer is more about guaranteeing that no elements are dropped while also having an upper bound on memory consumption.

I personally think that we should subset out any cancellation policy into a separate proposal to keep the scope of this as small as possible. I understand the need for such a policy but I would encourage us to try to focus on the 90% use-case first and expand from there.

3 Likes

Yeah something like this. We can bikeshed the exact wording of the cases; it boils down on acting on first cancellation, a number of cancellations or ignoring like the lifetime or suppress are doing.

I personally think that we should subset out any cancellation policy into a separate proposal to keep the scope of this as small as possible. I understand the need for such a policy but I would encourage us to try to focus on the 90% use-case first and expand from there.

As long as that follow up would actually happen that's agreeable. The current proposal however must be explicit about documenting the cancellation behavior it will exhibit.

3 Likes

I think that is reasonable to keep on the table for later.

Per the other policy:

I feel like that is a big ask for folks to consider right out the gate when bringing up something, however I agree that in the end folks will likely make some sort of decision one way or another.

I'm ok if that is the last request preventing consensus, but it definitely is a hit on the usability/ergonomics front.

I wrote a similar implementation a while ago for reuse nio response body.

Reference swift-lazy/Sources/LazyKit/AsyncCachedSequence.swift at main · Myoland/swift-lazy · GitHub

Generally, non defaulted parameters should be avoided if there's no good way for users to come up with a reasonable value on their own. In this case, unless you're an expert in async streams or reactive systems, there's no way to know whether you'd want an infinite buffer or a specific buffer. Therefore it should default to the unbounded value and those experts which know how to calculate a proper buffer value can provide it.

For app developers there's almost never a reason to not use an unbounded buffer, and as an experienced developer of 20 years, I really have no idea what a different reasonable value would be for that use.

As an aside, can we streamline some of this design litigation by starting from a Combine or Rx point and deviating only when the concurrent nature requires? This is going to take forever if every API needs to be relitigated practically from scratch.

8 Likes

Strongly disagree, this means every case where you are about correctness you must be careful to never use a default param. This is extremely bad default.

2 Likes

If there are incorrect parameters then they shouldn't be parameters in the first place, and if they must be, then the incorrect version shouldn't even be possible. But an unbounded buffer isn't really incorrect, it's just slightly dangerous in the narrow case of a high input system that worries about DDoS. Of course, for most use cases this isn't actually incorrect, and in typical usage an unbounded buffer will never be an issue. You shouldn't have to be an expert to use these APIs. If that makes these APIs unsuitable for backend use then it seems like that environment should have its own library rather than forcing the 90%+ of Swift developers who will never need to care to mindlessly fill in default values.

2 Likes

I strongly disagree with this assessment. First unbounded is not only a concern for server application due to the potential of denial of service attacks but also for other use-cases such as on constraint systems including modern mobile applications. You don’t want to have your iOS application terminated due to a random asynchronous sequence potentially outside of your control forgotten to change the buffer policy. Secondly, Swift is all about correctness and safety and only then about progressive disclosure. In my opinion, this is applies here as well.

If we were to choose a default value then something like bounded with a small value like 10 seems a lot safer.

2 Likes

To add some clarity; just because the buffer's limit is set to a certain value does not mean it pre-allocates that amount to the buffer. So in the case of .unbounded it is isomorphic to .bounded(.max).

And if you would naturally have an issue fixed by either a dropping policy or a bounded policy; it will likely be also an issue due to a buffer you have zero control over. Like for example the buffer underlying the execution of blocks in a dispatch queue (which also is unbounded). So even .bounded(N) has a hidden unlimited buffer present.

Choosing abstract values to bound to is not a good default because that just leads to inconsistent behavior that is hard to trace down. Whereas an unlimited default does have the route of instrumentation as a potential.

All that being said; I would rather have a .share(bufferingPolicy: .unbounded) as the spelling than no .share() algorithm; and I would imagine that many developers would also have that same opinion. To that end; I am going to change the pitch to remove the default parameter and if folks absolutely want it they can add one themselves. I am going to guess that some folks may have that as a common extension and that is ok because they can take responsibility for the impact in their app accordingly.

That just means most developers either litter their codebase with a thoughtless default, or they create their own .share() to give them the API surface they want, both of which are net negatives to the DX.

3 Likes

Wouldn't that just silently drop values? That can be a far worse result than memory ballooning slightly (though that doesn't really happen in apps either) because you may miss critical, including security critical, values. If we choose an arbitrarily small limit, how are users supposed to detect when values were dropped?

2 Likes