Nested async sequences: questions about interleaving & buffering

Hello,

I'm about to nest two async sequences:

// Expected output:
// foo 1
// - bar 1.1
// - bar 1.2
// foo 2
// - bar 2.1
for await foo in foos {
  print("foo", foo)
  for await bar in bars {
    print("- bar", bar)
  }
}

(Real sequences will be StoreKit sequences, so I really don't want to mess with them and rely on wrong logic.)

Since I'm not yet familiar with Swift Concurrency, there are two questions that are nagging me:

  1. Am I sure that loops are not interleaved? in other words, that I won't handle a foo until all bars from the previous foo are consumed?

    foo 1
    - bar 1.1
    foo 2
    - bar 1.2 ❌
    - bar 2.1
    ...
    

    I could not witness such an interleaving with my test sequences (AsyncStream fed from a Foundation.Timer, and Foundation.FileHandle.AsyncBytes), but I'm not 100% sure yet.

    So: is this behavior guaranteed by the language, or should I write defensive code against interleaving?

  2. Am I sure that foos are buffered as long as bars from previous foos are iterated?

    foo 1
    - bar 1.1
    - bar 1.2
    foo 3 (❌ foo 2 was missed!)
    - bar 3.1
    ...
    

    Again, I could not witness such data-loss with my test sequences, even with an AsyncStream with an bounded buffering policy. I'm still unable to know if was just lucky, or if I just didn't find the sequences that would exhibit the issue.

    So: is data-loss prevention guaranteed by the language, or should I write defensive code against it?

On the first question, "can loops interleave?", the answer is clearly NO.

I could convince me on this by imagining that the inner loop is wrapped in an async function:

for await foo in foos {
  print("foo", foo)
  await consumeBars()
}

func consumeBars() async {
  for await bar in bars {
    print("- bar", bar)
  }
}

Clearly the outer loop can't progress until a step has completed, i.e. until all bars have been iterated.

So remains only the question about data loss / buffering. I'm more and more convinced that buffering must occur, and again with the thought experiment of wrapping the inner loop in an async function:

for await foo in foos {
  // Regardless of the implementation of f,
  // the language is not allowed to skip any element
  // from the async sequence.
  await f()
}

My intuition was misguided by the fact that AsyncStream has a buffering policy. I'm still somewhat confused about "who" is responsible, in the language and the standard library, for buffering elements that wait to be processed? I guess I'll grasp a better understanding eventually.

Meanwhile, I guess I have no question left.

It depends on the implementation of the type that conforms to AsyncSequence. AsyncStream is one example where buffering happens and items can be dropped. Often this would not happen because an async sequence would typically fetch the next items on-demand and not continuously receive them regardless of if there is someone to consume them.

1 Like

Oh, thank you for fixing my mistake!

All right, a plain for await element in elements { await doStuff() } is generally able to to drop elements. I'll have to swallow this, even if I can understand that unbounded buffering by default would have been too dangerous.

My first sample codes with AsyncStream were clearly wrong somewhere. I can now clearly see that values can be dropped:

sample code
import _Concurrency
import PlaygroundSupport
import Foundation

PlaygroundPage.current.needsIndefiniteExecution = true

func test(bufferingPolicy: AsyncStream<Date>.Continuation.BufferingPolicy) {
    // Yield one date every second
    var continuation: AsyncStream<Date>.Continuation?
    let timer = Timer.scheduledTimer(withTimeInterval: 1, repeats: true, block: { _ in
        continuation?.yield(Date())
    })
    let stream = AsyncStream<Date>(bufferingPolicy: bufferingPolicy) { continuation = $0 }
    Task {
        for await date in stream {
            // Print the date and wait 2 seconds
            print(bufferingPolicy, date)
            try await Task.sleep(nanoseconds: 2_000_000_000)
        }
    }
}

This gives:

// Unbounded buffering: no value is dropped
// unbounded 2022-02-10 09:08:45 +0000
// unbounded 2022-02-10 09:08:46 +0000
// unbounded 2022-02-10 09:08:47 +0000
// unbounded 2022-02-10 09:08:48 +0000
// unbounded 2022-02-10 09:08:49 +0000
// unbounded 2022-02-10 09:08:50 +0000
// unbounded 2022-02-10 09:08:51 +0000
test(bufferingPolicy: .unbounded)

// Bounded buffering: some values are dropped
// bufferingOldest(1) 2022-02-10 09:08:45 +0000
// bufferingOldest(1) 2022-02-10 09:08:46 +0000
// bufferingOldest(1) 2022-02-10 09:08:48 +0000 // 47 missing
// bufferingOldest(1) 2022-02-10 09:08:50 +0000 // 49 missing
// bufferingOldest(1) 2022-02-10 09:08:52 +0000 // 51 missing
test(bufferingPolicy: .bufferingOldest(1))

Sort of, but also not really.

An AsyncSequence is a thing that you can use in a for await loop - i.e. you await for an element, it gives you an element, you await another element, it gives you another element, etc (or it completes). That is quite literally all it defines. It doesn't say anything about what those values are, whether you get the same elements every time, etc. The sequence decides what its elements are, based on which functionality it is trying to provide.

The possibility of dropping elements is not a feature of the language, or the for await syntax; it is a feature of the conrete AsyncSequence you happen to be using. That sequence is receiving elements from some external source, and has decided that not all elements are important, and that you don't need to know about all of them. Only it can make those decisions.

When consuming an AsyncSequence, my advice would be to completely ignore any possibility of dropping elements. That's a low-level implementation detail of the sequence that you shouldn't need to care about. In particular, don't try to implement your own buffering out of fear of dropping events (if you need to buffer, by all means do it, but don't let this be the reason).

So saying "for await element in elements is able to drop" is not entirely true. It's the elements sequence itself which may decide not to yield every element it possibly could. for await processes every element the sequence decides to yield and by itself will not drop anything.

4 Likes

I see what you mean, but sometimes one must be able to positively assert that a program is behaving as required.

The sequences I have to deal with are Transaction.currentEntitlements and Transaction.updates, which deal with App Store purchases. You wouldn't want me to ignore the possibility of dropping your own purchases.

So we need an accurate depiction of async sequences iteration. From this accurate description, users can decide if they can risk dropping elements or not, and if they can afford dropping them or not.

The documentation of the StoreKit sequences I mentioned do not describe their behavior w.r.t. this question. How is one supposed to write bug-free code, without assuming the worst that is permitted by the language?

I will thus not "completely ignore any possibility of dropping elements". I'll keep asking questions, challenge the documentation, look for adversarial scenarios, test hypothesis, look for lovely help in these forums, and avoid excess of optimism from shipping avoidable bugs.

What you're saying is the equivalent of "what if Transaction.updates doesn't include the updates it is supposed to include?"

In that case, there's literally nothing you can do. It has nothing to do with what is permitted by the language. What you are worried about is the equivalent of a sequence of even numbers containing an odd number due to an implementation bug.

Except in this case, you're not worried about an extra element; you're worried about a missing element. An element you won't see. There's literally nothing that can be done about that. It would be a bug in the framework and you'd have to report it and wait for a fix.

By all means, then - if you think it is an "excess of optimism", try to handle the possibility of implementation bugs in the frameworks leading to you not seeing certain events. Because that seems to be what you are worried about.

As I said previously, AsyncSequence and for await do not allow any more events to be dropped than regular Sequence or for loops. The question of whether an event should be delivered or not is a matter for the sequence implementation. You must assume that they will deliver all the events they are supposed to deliver, according to the functionality they are trying to implement.

1 Like

Of course. I don't know how an iterator is implemented. Assuming that the implementor was nice enough to buffer elements until I request them with AsyncIterator.next() is what I call an excess of optimism.

The only generally optimistic position I can live with is the synchronous iteration:

// I'm sure no element is dropped
for await element in sequence {
    print(element)
}

Insert an await in the loop and all bets are generally off:

// I'm not sure no element is dropped.
for await element in sequence {
    await process(element)
}

Only specific sequences, whose type is known, and behavior properly documented, can change this.

You must assume that they will deliver all the events they are supposed to deliver, according to the functionality they are trying to implement.

When Apple starts fixing their bugs in a couple of days, I'll be able to afford to understand your MUST as a MIGHT :sweat_smile:

My two cents is that this is an API design and documentation issue, rather than a syntactical or semantic concern of the concurrency features itself.

If the API vendor expects the API user to have to process every single element being emitted, they should have vended an AsyncSequence and AsyncIteratorProtocol implementations that provides that exactly once guarantee, and document it as such. Be it AsyncStream with unbounded buffering, or a bespoke implementation.

This is also a runtime behaviour concern that is probably solvable only by QA and documentation, since we are working with live time series data. Say an API promises to vend an array with no less than N elements, but we worry that it sometimes gives you less than N in practice. For sure we can defensively code around it to establish some confidence of forward progress within what we can control. But if the API vendor does end up violating part of an API contract, can our program survive on the burning platform anyway? So in the end, it has to go back to the API vendor to document and maintain them properly.

Insert an await in the loop and all bets are generally off:

If the async sequence suspends any further work until you request the next element (e.g., async file iteration), or if the async sequence does unbounded buffering, there shall be no issue. But as mentioned, this is a choice to be made by the API vendor, on what runtime guarantees they wish/have to provide.

2 Likes

More seriously, I see that you try to advise to not make a program more complex than necessary, because of ungrounded paranoia. And you're right! :100:

Here I deal with sequences whose behavior is undocumented, and contain purchase information. I feel entitled for a reasonable amount of paranoia. And I'm sure many readers of the forum have learned something about async sequences.

An async sequence is not a sequence. It's obvious for some, but never stated enough for many. They do not behave the same, despite the familiar for ... in ... syntax. If one brings too much luggage from one's experience with Sequence, one might be surprised.

How sure are you? Synchronous sequences can be generated on-demand, just as async sequences may be. What's to stop an implementation from skipping elements?

The only time you can be sure is if the sequence is (from) a collection, such as Array or Dictionary. Those don't drop elements by construction. (It would be an undeniably world-breaking bug if those started skipping or dropping elements.)

2 Likes

Quite frankly you can worry about the exact same thing with synchronous sequences: how do you know that a sequence is not buffering and dropping elements by the time you finish your loop body and ask for the next element? This could happen too.

2 Likes

I think this is where the core misconception lies. Async sequences are just like sequences.

Just because you need to await for each element, that does not allow the sequence to drop elements. It doesn't. It just doesn't.

When you mention "dropping" elements, it's important to be precise. What is actually happening is backpressure management - i.e. the sequence (synchronous or async) has data which is just accumulating at a rate higher than the consumer is removing it, so the sequence decides on its own that it's safe to not return those elements.

Imagine, for example, a synchronous iterator of streaming video frames:

class VideoFrameIterator: IteratorProtocol {
  var buffer: [VideoFrame]

  func next() -> VideoFrame? {
    // 1. Check how many frames are in the buffer.
    guard buffer.count > 100 else { return buffer.removeFirst() }

    // 2. If we have lots of old video frames, discard them.
    buffer.removeFromFront(where: { $0.age > 30 * 1_second })

    // 3. Remove and return the oldest frame.
    return buffer.removeFirst()
  }
}

Here, it's doing what you call "dropping elements". It has data in its buffer, but it decided on its own that the data isn't useful and it would rather discard them instead of showing you video frames from 10 minutes ago.

This would make sense for streaming video, but maybe not for all things, and maybe the streaming library will also include iterators which don't implement this behaviour. But if it tells you it returns the video frames you need to display, then it returns the video frames you need to display. That's all there is to it, and anything else is a bug that can't be fixed except by the library developer.

Async sequences are not allowed to drop elements just because they are async. They can implement backpressure management, just like synchronous sequences can, but by definition that should not drop any important information.

5 Likes

Thanks Karl, I'll remember this. And when a sequence does what it is not allowed to do (because "not allowed" does not mean "impossible"), I'll know that a bug has to be reported.

2 Likes