A pitfall when using didSet and Task together - order can't be guaranteed

I often use this pattern to save the newest value to file.

struct Foo {
    var value: Int {
        didSet {
            Task {
                // call an async func to save the value
            }
        }
    }
}
Complete Code Example (I added debugging code to print value in a few places)
import Foundation

import AsyncAlgorithms

actor FileService {
    static let shared = FileService()

    let url = URL.documentsDirectory.appending(path: "myfile")
    let (datum, cont) = AsyncStream.makeStream(of: Data.self)

    init() {
        Task {
            for await data in datum {
                let value = try! JSONDecoder().decode(Int.self, from: data)
                print("value in actor [consumer part]: \(value)")
                try data.write(to: url, options: .atomic)

            }
        }
    }

    func save(data: Data) {
        let value = try! JSONDecoder().decode(Int.self, from: data)
        print("value in actor [provider part]: \(value)")
        cont.yield(data)
    }
}

struct Foo {
    var value: Int = 1 {
        didSet {
            Task { [value = self.value] in
                print("value in client side task: \(value)")
                let data = try JSONEncoder().encode(value)
                await FileService.shared.save(data: data)
            }
        }
    }
}

func test() {
    var foo = Foo()
    foo.value = 2
    //sleep(4)
    foo.value = 3
    sleep(10)
}

test()

I didn't think a lot about the code. I thought it should just work (it worked fine in practice indeed). However, as I'm refactoring the code, I test it and find unexpected behavior - the order isn't guaranteed.

You can run the above code to see the output yourself. It generates different output almost in every run.

Note: there is a key setup in the code - there should be no delay between foo.value = 2 line and foo.value = 3 line in test().

Example output 1:

value in client side task: 3
value in client side task: 2
value in actor [provider part]: 2
value in actor [provider part]: 3
value in actor [consumer part]: 2
value in actor [consumer part]: 3

Example output 2:

value in client side task: 2
value in client side task: 3
value in actor [provider part]: 3
value in actor [provider part]: 2
value in actor [consumer part]: 3
value in actor [consumer part]: 2

Example output 3:

value in client side task: 3
value in client side task: 2
value in actor [provider part]: 3
value in actor [consumer part]: 3
value in actor [provider part]: 2
value in actor [consumer part]: 2

My observations:

  1. First, the order of value in client side task is not fixed (see log above. It was "3, 2" or "2, 3"). This surprised me a bit. The code that prints the log runs in the same actor context, so they are executed in serial. I had thought they were scheduled using a FIFO strategy in this case and hence would run in order, but the output shows it's not true.

  2. Second, the order of "value on client side" and "value in actor [provider part]" don't always match. Take the output 1 as an example:

    client side task:      3, 2
    actor [provider part]: 2, 3
    

    So, even if I was able to maintain the correct order in client side task, the await <someActorMehtod> call would cause the someActorMethod see the values in different order? While I might be able to understand the behavior (I personally would like to understand async func as coroutine, and the way how coroutines are scheduled determines their execution order), I find this is anti-intuitative (I don't mean it should have a different behavior, I just think it's tricky to use correctly).

Given the above observations, it seems the ordering assumption in the pattern I described at the beginning is completely wrong? When value changes not very fast, the potential issue is hided and code works as expected. However, if value changes fast enough, the behavior will become weird and hard to debug.

If so, I wonder what's the correct way to implement it? My requirement is very simple: whenever a value changes, I'd like to call an actor's method and that method should run in order.

Thank for any suggestions.

Hmm you're right. I think every time you call Task or await you lose all guarantees of order when calling from synchronous code. To the FileService.shared object, foo.value = 2 and foo.value = 3 are basically called at the exact same time. Maybe you could use a Publisher + Publishers.Buffer to ensure that the order of calls in synchronous code is maintained?

Task{} indeed breaks enqueue order, and it's been a long standing issue like that.

The way it is implemented today the Task initializer does not know where your code will end up running, so it has to enqueue on the global pool. If your code actually ends up calling into an actor immediately; you just lost ordering, like in this snippet:

Task {   
  await someActor.hello()
}

because of the "hop to global -> (reorderings may happen, something else may have enqueued on actor already in the meantime) -> hop to actor, run the actor method".

We are proposing to fix this via Task starting on the target actor which would take the shape of Task { [isolated target] await target.hello() } or doing this also automatically but if you want the guarantee you'd want to spell it out like that...

Ongoing proposal about this over here: Closure isolation control

10 Likes

@ktoso Thanks for the explanation. In the following example Task code doesn't have async call, but it still outputs the value in random order. Is it the same reason? I think in this case Task code only hops to global pool and then doesn't move? (Honestly speaking, I don't really know what global pool is. Is it the thread pool for global actor?)

import Foundation

struct Foo {
    var value: Int = 1 {
        didSet {
            Task { [value = self.value] in
                print("value: \(value)")
            }
        }
    }
}

func test() {
    var foo = Foo()
    foo.value = 2
    //sleep(4)
    foo.value = 3
    sleep(6)
}

test()

Yeah that's expected to not guarantee any order still (for better or worse).

You'd have to isolate the task to an actor as i showed before -- but that's an incoming feature, today you'll have to use an async sequence and do events through that, quite a pain we aim to resolve with the linked proposal.

The global pool refers to the "global concurrent thread pool" where tasks run by default. Because it is multi-threaded, there's no order guarantee there which task put onto it will run first.

3 Likes

Indeed. It's not fun to see code that is a pain to write, but that's what the OP is asking for. Future features are still in the future. So let's look at the pain straight in the eyes, and let's write the code.

I had to make Foo a class, so that it keeps track of the (one) Task that performs the async code. If Foo must remain a struct, then another type has to create that task.

final class Foo {
    private let continuation: AsyncStream<Int>.Continuation
    private let task: Task<Void, Never>

    var value: Int {
        didSet {
            continuation.yield(value)
        }
    }

    init(value: Int) {
        let (stream, continuation) = AsyncStream.makeStream(of: Int.self)

        self.value = value
        self.continuation = continuation
        self.task = Task {
            for await value in stream {
                // call an async func to save the value
                print(value)
            }
            print("Ended")
        }
    }

    deinit {
        // Necessary for "Ended" to be printed, and
        // generally to make sure everything is properly
        // cleaned up.
        task.cancel()
    }
}

final class FooTests: XCTestCase {
    func testFoo() async throws {
        var foo: Foo! = Foo(value: 1)

        // Eventually prints 2, 3, in this order
        foo.value = 2
        foo.value = 3

        try await Task.sleep(for: .seconds(1))

        // Prints "Ended"
        foo = nil
    }
}
Variant with a Foo struct and a distinct Queue class, probably closer to what actors WILL eventually be able to do.
final class Queue {
    typealias Operation = @Sendable () async -> Void
    private let continuation: AsyncStream<Operation>.Continuation
    private let task: Task<Void, Never>

    init() {
        let (stream, continuation) = AsyncStream.makeStream(of: Operation.self)

        self.continuation = continuation
        self.task = Task {
            for await operation in stream {
                await operation()
            }
            print("Ended")
        }
    }

    func enqueue(_ operation: @escaping Operation) {
        continuation.yield(operation)
    }

    deinit {
        // Necessary for "Ended" to be printed, and
        // generally to make sure everything is properly
        // cleaned up.
        task.cancel()
    }
}

struct Foo {
    // To remove this property, we'd need a global variable that
    // contains the Queue that would handle `didSet` for all Foo instances.
    let queue: Queue
    var value: Int {
        didSet {
            // Explicit [value] capture in order to avoid a compiler error:
            // > Mutable capture of 'inout' parameter 'self' is not allowed in concurrently-executing code
            queue.enqueue { [value] in
                // call an async func to save the value
                print(value)
            }
        }
    }
}

final class FooTests: XCTestCase {
    func testFoo() async throws {
        var queue: Queue! = Queue()

        do {
            var foo = Foo(queue: queue, value: 1)

            // Eventually prints 2, 3, in this order
            foo.value = 2
            foo.value = 3
        }

        try await Task.sleep(for: .seconds(1))

        // Prints "Ended"
        queue = nil
    }
}
5 Likes

Just to clarify, even if you isolate the closure passed to a task with a specific global actor (e.g. @MainActor or a custom global actor), it still hops to the global thread pool once for the enqueuing, breaking order?

So the following would still not guarantee order?

    Task { @MainActor in 
        myMainActorIsolatedType.doSomething()
    }

I always thought that if the closure passed to the Task's initializer was isolated to a specific global actor we'd be safe...
Also, would there be a difference between @MainActor as used in my example and a custom global actor and a custom global serial actor?

Yeah, currently the @MainActor attribute is insufficient to guarantee order. This is one of the problems that SE-0431 aims to address.

1 Like

It would be nice to see a full sample code that addresses OP's needs, but with the future language features. A teasing, just to see how much pain will be removed. It's not quite easy to come up with one, when reading the proposal.

Since Tasks by default execute their body on the concurrent global thread pool, even with the ordering guarantees provided by SE-0431, the code from the OP loses ordering as soon as the Tasks begin execution. Even if each task starts executing in an ordered manner, the individual threads doing the encode operation could be preempted by the OS and so become unordered before we get to the FileService.save(data:) call.

In order to preserve the order by the rules in SE-0431 you'll need to preserve the ordering from the beginning of the Task body execution all the way until the save call. Since it appears FileService is intended to be used basically as a global actor (i.e., FileService.shared rather than individual FileService instances), you could make that formal by marking it as @globalActor and then you should be able to maintain order as such:

struct Foo {
    var value: Int = 1 {
        didSet {
            Task { @FileService [value = self.value] in
                print("value in client side task: \(value)")
                let data = try JSONEncoder().encode(value)
                FileService.shared.save(data: data) // no await needed, we're already running on the FileService actor
            }
        }
    }
}

This may also require some refactoring to mark things as @FileService-isolated as needed. Of course, if the goal is to specifically keep the encode operation off the FileService actor, then the above won't work. In that case you'd probably need to create a different global actor to order the encode operations, and at the end of each of those kick off the @FileService-isolated task, something like:

struct Foo {
    var value: Int = 1 {
        didSet {
            Task { @EncodeService [value = self.value] in
                print("value in client side task: \(value)")
                let data = try JSONEncoder().encode(value)
                Task { @FileService in
                    FileService.shared.save(data: data)
                }
            }
        }
    }
}

This should let the ordering guarantees provided by @EncodeService then get propagated to the Task bodies isolated to @FileService.

cc @ktoso @John_McCall does the above seem in-line with the intent of the guarantees provided by SE-0431?

8 Likes

That looks right to me.

1 Like

Guess I should also note that the second snippet there will force all encode operations to proceed serially, that is, we wouldn't start encoding 3 until encoding 2 has completely finished. If one instead wanted to allow encoding to proceed in parallel but still somehow force the writes to proceed in an ordered manner relative to when the didSet is called, I'm not sure SE-0431 will help since you'd be trying to recover the original ordering with a fundamentally unordered process in the middle.

3 Likes

Assuming you just want to always save the latest value, an alternative approach would be to use an AsyncStream with a buffer size of 1 and policy of keeping only the newest value. You then just have a background task which sits in an infinite loop reading from the stream, and whenever multiple changes happen while that task is busy saving an earlier one, those changes will just automagically be deduplicated to preserve only the latest.

Ordering of changes is preserved because the yield method (by which you insert values into the stream) is synchronous (and guaranteed to never block; it's safe to use even from your GUI thread, for example).

It can get more complicated if you need to have multiple streams, such as if you have multiple properties that get bundled together under one save operation, but there are various ways to deal with that (like pre-bundling them and inserting that bundle into a stream, such that you still need only a single stream).

4 Likes

First of all: Thanks for confirming my understanding of the issue and SE-0431. With concurrency I very often find myself in the weird position of being pretty sure I got it, but having this lingering feeling in the back of my mind that I might, in fact, not have gotten it. :smiley:

About the "drawback" (if you even want to call it that) you pointed out that remains (the encoding being serially in your example):
While I can imagine a scenario where parallelization of that would be desirable, I wouldn't say this not being addressed in the proposal is a problem.
Or in other words: Such a fundamental complex thing is most likely hard to implement in any way, I don't think language features alone can help with that. I like SE-0431 because it makes what you read in code intuitive: Isolating the closure passed to Task in any way means it's "immediately scheduled" on that exact isolation context. While that also leads to people no longer being surprised by changes in order, that's kind of a positive side-effect. :smiley:

1 Like

Thank all for the explanations and suggestions. I have two more related questions.

Q1: Will the closure isolation control proposal be implemented as a compile time change or Swift runtime change?

Suppose I have an app containing the following code. The app supports iOS 16.0 (an earlier iOS release) and above. I have already compiled it and submitted its binary to App Store.

Task { @FileService [value = self.value] in 
   print(value)
}

I wonder, after the proposal is implemented, what shall I do to make sure my app will get the new behavior when it runs on iOS 16.0? Below are my understandings:

  • If the proposal will be implemented as a compile time change, I'll need to recompile and resubmit the app's binary.

  • If the proposal will be implemented as a Swift runtime change, I'll need to recompile and resubmit the app's binary too. And a Swift runtime will be bundled in my app for iOS 16.0 devices.

Could anyone please conform if my above understandings are correct?

Q2: How to implement the AsyncSequence based workaround before the proposal is implemented?

@gwendal.roue gave two solutions above. I prefer to his second approach because it decoupled client and service code. IIUC that's also what @wadetregaskis suggested.

@gwendal.roue's second approach
final class Queue {
    typealias Operation = @Sendable () async -> Void
    private let continuation: AsyncStream<Operation>.Continuation
    private let task: Task<Void, Never>

    init() {
        let (stream, continuation) = AsyncStream.makeStream(of: Operation.self)

        self.continuation = continuation
        self.task = Task {
            for await operation in stream {
                await operation()
            }
            print("Ended")
        }
    }

    func enqueue(_ operation: @escaping Operation) {
        continuation.yield(operation)
    }

    deinit {
        // Necessary for "Ended" to be printed, and
        // generally to make sure everything is properly
        // cleaned up.
        task.cancel()
    }
}

struct Foo {
    // To remove this property, we'd need a global variable that
    // contains the Queue that would handle `didSet` for all Foo instances.
    let queue: Queue
    var value: Int {
        didSet {
            // Explicit [value] capture in order to avoid a compiler error:
            // > Mutable capture of 'inout' parameter 'self' is not allowed in concurrently-executing code
            queue.enqueue { [value] in
                // call an async func to save the value
                print(value)
            }
        }
    }
}

final class FooTests: XCTestCase {
    func testFoo() async throws {
        var queue: Queue! = Queue()

        do {
            var foo = Foo(queue: queue, value: 1)

            // Eventually prints 2, 3, in this order
            foo.value = 2
            foo.value = 3
        }

        try await Task.sleep(for: .seconds(1))

        // Prints "Ended"
        queue = nil
    }
}

My concern is, since the code uses class instead of actor, it isn't thread safe. Specifically, continuation.yield() might be called from different threads. That looks like an issue to me. In my experience, an API should be assumed thread-unsafe unless its docuement says it's thread-safe explicitly.


EDIT: On a second thought, in the pattern I described at the beginning of the thread, the value isn't supposed to be modified in different threads. Otherwise there is no way to guarantee the order in the application logic in the first place. So, with that constraint, my second question is invalid in the scenario I have in the mind (though I don't know if there are other scenarios it's valid) and Gwendal Roué's approach should work fine. PS: also see his answer below.

The Queue class only contains immutable Sendable properties, so you can mark it Sendable in order to say it is "thread-safe". The documentation says:

The continuation conforms to Sendable , which permits calling it from concurrent contexts external to the iteration of the AsyncStream .

1 Like