Is it possible to look ahead when processing AsyncSequence?

I'm trying to implement a simple scenario using AsyncSequence. It's a variation of producer/consumer pattern. The scenario:

(Suppose there is a value in my code)

  • Requirement 1 (basic): when the value changes, I'd like to save its new value in a file. The save operation should be performed asynchronously so it doesn't block other code.

  • Requirement 2 (advanced): let's suppose that saving a value to file may take a noticeable amount of time, and during that period there may be multiple new values coming in. In a typical producer/consumer pattern, all new values will be saved one by one. In my case, however, I'd like to discard the intermediate values and only save the newest one.

The first requirement works well enough in practice (that's what I actually use in my app). But I'm curious how to implement the second requirement using AsyncSequence? I have thought about it for a while, but can't find a way.

  • If I implement the requirement on consumer side, I'll need to implement some async algorithm which is able to look ahead. As far as I can tell, there isn't such API.

  • If I implement the requirement on producer side, given the fact I should yield a value as soon as it changes (otherwise I would introduce delay between the time when a value changes and the time when the consumer sees it), I'll need some way to remove the pending elements in async sequence (by pending I mean the element hasn't been consumed yet). As fart as I can tell, there isn't such API either.

Am I missing something? Is it possible to implement the requirement using Swift concurrency?

PS: for people who'd like to do experiments, below is example code which implements the basic requirement.

Example code
import Foundation

actor FileService {
    static let shared = FileService()

    let url = URL.documentsDirectory.appending(path: "myfile")
    let (datum, cont) = AsyncStream.makeStream(of: Data.self)

    init() {
        Task {
            for await data in datum {
                try data.write(to: url, options: .atomic)
            }
        }
    }

    func save(data: Data) {
        cont.yield(data)
    }
}

struct Foo {
    var value: Int = 1 {
        didSet {
            Task { [value = self.value] in
                let data = try JSONEncoder().encode(value)
                await FileService.shared.save(data: data)
            }
        }
    }
}

func test() {
    var foo = Foo()
    foo.value = 2
    sleep(6)
}

test()
1 Like

I think you're looking for debounce

But, as you say, this does introduce a delay. I don't see any way to do this without a delay other than for a very specific sub-case which is if multiple values are enqueued synchronously.

Personally I would not be too concerned about a short delay here; if you aren't using fsync with the full flush argument (and please don't do that, in general) there'll be a delay before the state on disk is durable anyway.

Throttle might be another good alternative. But the last time I checked, the implementation didn't quite meet the specification.

I don't think what you are looking for works out of the box with AsyncSequence. You should be able to add your second requirement to a wrapper like AsyncQueue, though: AsyncQueue | Woody's findings

Note that on every value change (e.g. 10_000 times per second) you are running a task and converting the value to JSON (10_000 per second). If you debounce after that – the resulting JSON is thrown away – that would be quite a waste resource wise. I'd try to find a design that debounces at the earliest opportunity. Perhaps without restricting design to be using AsyncStream framework at first, if it's on the way.

2 Likes

Thank @David_Smith and @pyrtsa for your suggestions on debounce and throttle. The reason I don't like them is that they require an interval value. I was probably not clear on my requirements, so let me try again:

  • The purpose of saving the value to file is persistence.

  • Only the newest value matters. Old value is overwritten when saving a new value to the file.

  • Because a user may switch app to background or kill it at any time. It's important to save a new value as soon as possible. That's why I don't want to introduce delay.

  • Value change is caused by user interaction in my app, so it's unlikely very fast. That's why it works well to just implement the basic requirement. I was trying to implement the advanced requirement as an exercise.

Thank for the pointer. I think that's what I was looking for. I considered a similar approach, though I didn't realize I could use CheckedContinuation to interface between synchronous code and asynchronous code. So, AsyncSequence does have the mechanism to implement the behavior. It's just that AsyncStream doesn't support it, which makes sense to me.

For this specific issue, I think it can be resolved by moving the decode call to the for-loop body of async sequence which has debounce applied.

Sorry that I wasn't clear on this. I don't have to use AsyncStream. I did it as an exercise and was open to all possible approaches using Swift concurrency.

BTW, do you plan to use safe-save mechanism?

Yes. What's that mechanism? I'm interested to know about it (I googled it but it returned nothing).

I now see you are already using ".atomic" flag above, so you are already doing that.

1 Like

Ah, I see your point. In my old code, I didn't use actor. That worked well because of the .atomic flag. I'm changing my code to use actor just because it's more clear on architecture level - now I have a component providing file service and other code will use this service rather than do it in ad hoc way (calling async func).

BTW, .atomic is safe in the sense that it's guaranteed the file wouldn't be corrupted. However, it doesn't prevent data loss caused by user killing app.