Task: Is order of task execution deterministic?

Thank you for the extremely useful explanation.

Concurrent programming was not meant be easy; never assume any ordering of events and prepare for all possibilities. (learned this 3 decades ago while writing/maintaining code for a comms-network protection system.)

Why is this enqueueing feature not already in the Swift concurrency system?

A lot of programmers out there with very little or no experience in concurrency are being forced to write code that involves concurrency. They need all the good help they can get :slight_smile:

2 Likes

Hm... I think runtime function that I made to support Isolated synchronous deinit could be used for "enqueue, but don't wait" operation.

If we are already on the correct executor it executes the code immediately. Does this fit the desired semantics? And it expects function to take closure context as owned. So closure is effectively self-consuming. Not sure if it is possible to express this convention the type system. But if not it could be solved with a heap allocation and a thunk.

But I guess the main challenges actually is expressing in the type system closure type isolated on the actor instance. This introduces dependent types, which AFAIK, Swift does not support.

extension Actor {
   func perform(_ work: @self () -> Void) {
        _performOnExecutor(getCtx(work), getFunc(work), self.unownedExecutor)
   }
}

actor MyActor {
    var k: Int = 0
    func inc() { k += 1 }
}

let a = MyActor()
let b = MyActor()
let aWork: @a () -> Void = {
    a.inc() // No await, we are inside actor context of a
}

// Should be an error: cannot convert @a () -> Void to expected type @b () -> Void
// But Swift type system cannot do this atm.
b.perform(aWork) 

I think that would work for my use case.

I just spent time debugging a similar issue here. In my case I have an actor that wants to update state based on a publisher provided from elsewhere. If two values come in too quickly, they can come in out of order when using a non isolated sink. My solution was to subscribe to the publisher using .values, acting on the publisher like an AsyncSequence within a child task. This means a couple of things:

  1. The values from the publisher are delivered into the actor's context in the order that they are published.
  2. This child task needs to be bound to the lifetime of the actor properly and avoid retain cycles, especially if the publisher might be something like CurrentValueSubject and never finish (as it is in my case).

The bonus here is that the code for handling failures and completion becomes cleaner, since the loop exits on completion, and it throws on error.

actor MyActor {
  var currentMonitor: Task?

  deinit {
    self.currentMonitor?.cancel()
  }

  func monitor(_ publisher: AnyPublisher<SomeState, Never>) {
    self.currentMonitor?.cancel()
    self.currentMonitor = Task(priority: .utility) { [weak self] in
      for await update in publisher.values {
        await self?.applyUpdate(update)
      }
    }
  }
}

This approach doesn't really handle the cases where you really just need a generic enqueue of some kind, but if you have this sort of synchronous stream of values, then having the actor subscribe to the stream seems like a reasonable approach.

As an aside, you pretty much never want .background priority for anything except truly optional work, as it's subject to harsh system limits. For instance, when in low power mode, .background items aren't processed at all. (Assuming this priority aligns with DispatchQueue priority.)

2 Likes

Good catch on that. Thanks for pointing it out. I've updated my example.

Sorry, but as another aside, you probably don't want any explicit priority at all on that. Instead, you should inherit priority from the context, unless you really need to downgrade the priority of those items relative to others. In the general case like this you probably don't need to set priority at all.

Also fair. The usage the example came from does explicitly want the work to be done at a lower priority. But yes, in most cases you wouldn't.

Actually swift has kinda-dependent types using isolated keyword. The following is a working example in the current implementation:

extension Actor {
   func perform(_ work: (isolated Self) -> Void) {
        work(self)
   }
}

actor MyActor {
    var k: Int = 0
    func inc() { k += 1 }
}

let a = MyActor()
let work: (isolated MyActor) -> Void = { x in
    x.inc() // No await, we are inside actor context of a
}
await a.perform(work) 

swift_task_performOnExecutor would not work here nicely, it expects work function to have no arguments, other than the context, but a new similar function could be created.

I think this actually might work.

1 Like

I know I'm late to this thread, but I wanted to provide a concrete and short test case that surprised me when it failed:

@MainActor
func test_mainActor_taskOrdering() async {
    var counter = 0
    var tasks = [Task<Void, Never>]()
    for iteration in 1...100 {
        tasks.append(Task {
            counter += 1
            XCTAssertEqual(counter, iteration) // often fails
        })
    }
    
    for task in tasks {
        _ = await task.value
    }
}

Given that Tasks inherit the context within which they are created (unless they are detached), I expected that Tasks created from an actor or @MainActor context would begin executing in the order in which they were created.

The practical affect of there being no way to achieve deterministic ordering when bridging from synchronous code to structured asynchronous code via a Task is that my engineering team is avoiding utilizing actor, async, and await in our codebase except in the rare instances where input order does not matter.

I'm hoping that deterministic Task scheduling can be achieved down the line – I really want ditch my serial DispatchQueue usage in favor of more compile-time-safe actor types!

P.S. If there's a better place for me to post these thoughts, please let me know and I'll happily move this comment.

3 Likes

Yeah, this is a common and problematic pitfall in the current model. Swift actors are neither FIFO, or do they give you the usual actor operation of "enqueue this thing please", that is the baseline operation in other actor runtimes; Instead, Swift focused on always awaiting calls and allowing boosting of priority of awaited on tasks... In many ways this is very nice and interesting, but there definitely are cases where it is quite problematic.

There are IMHO multiple ways to approach this problem, and we'll need to argue and figure out which to surface (though I'd personally argue that all those are important and worth surfacing):

  • custom actor executors, where one could enforce FIFO semantics; (today's Swift actors are not FIFO)
  • tighter control over re-entrancy; some actor methods may prefer to opt into lack of reentrancy, in order to make actors more useful and sane for programming critical logic (today's Swift actors are always reentrant at suspension points)
  • allow to actually "enqueue" / "send" work to an actor, without awaiting on it; (today it is impossible to "not wait" on async calls, forcing us to create new Task {} in order to access actors, and thus hit the scheduling problem you noticed).
    • even if we changed the Task{} enqueue semantics, this would still be "scary" ordering wise, because being able to await on task.value means it could get reordered again, due to priority boosting (and the non-FIFO nature of default actors).

Your example specifically, hits the third point.

Note: This isn't a promise of a direction, but a personal opinion, based on previous work using actors I did, as well as using them in Swift a lot ever since we shipped them. Actual solutions we end up with might differ.


For sake of this discussion, let us how how a "send" can be easily simulated in today's runtime and how it does guarantee what you are looking for in this test.

Specifically, an execution of your test using Task{} might look like this:

// Task
Task{}: 1
Task{}: 2
...
Task{}:100
run:1
run:2
...
run:13
run:15
: Precondition failed: counter:14 != iteration:15

but if we allowed for a send / "enqueue" like operation, we'd always get the ordering you expected:

actor DeterministicOrderThanksToSend {
    var counter = 0

    func test_mainActor_taskOrdering() async {
        var tasks = [(Int, Task<Void, Never>)]()
        for iteration in 1...100 {
            fputs("send:\(iteration)\n", stderr)

            self.send {
                fputs("run:\(iteration)\n", stderr)
                self.counter += 1
                precondition(counter == iteration, "counter:\(counter) != iteration:\(iteration)") // often fails
            }
        }

        while self.counter < 100 {
            try? await Task.sleep(until: .now.advanced(by: .milliseconds(100)), clock: .continuous)
        }
    }
}

We always reliably get the right result:

// send
send: 1
send: 2
...
send:100
run:1
run:2
...
run:99
run:100

So conceptually, this is implementable right now, today. But we need to have a wider discussion how we want to approach this problem at large, with the concurrency team.

@John_McCall was just replying to another thread here Swift project focus areas in 2023 - #11 by John_McCall about how that guarantee is pretty weak and may not be enough across multiple "hops", but personally I disagree that the guarantee is too weak to be useful. It is the usual way a lot of actor code is built, and it would also tremendously help bridging non-async and async worlds of actors. Very frequently we just need this ordering between a pair of "streams" or otherwise happens-before related pieces of code: make sure the "init" is enqueued before the "done", both done from a synchronous context, which we have an incredibly hard time getting right nowadays (examples include streams, task cancellation handlers, non-async code).

So a send IMHO would be very useful; it remains to be seen how we'll solve these issues in Swift though.


A form of this I'm personally truly wishing for, because it'd help in many other places (including task cancellation handlers as well as interop with streams, and because it is the important uni-directional "I don't need a reply" concept in networked (or IPC) actor systems) is something like this:

actor DeterministicOrderThanksToSend {
    var counter = 0

    func test_mainActor_taskOrdering() async {
        for iteration in 1...100 {
            send increment(iteration: iteration) 
            // guaranteed enqueue order (1>2>3>...>100)
            // can't await result though, not to risk reordering by escalation
            // can be used from non-async code as well
        }

        while self.counter < 100 {
            try? await Task.sleep(until: .now.advanced(by: .milliseconds(100)), clock: .continuous)
        }
    }

    func increment(iteration: Int) async {
        // yes, this method is async, but we don't await it
        fputs("run:\(iteration)\n", stderr)
        self.counter += 1
        precondition(counter == iteration, "counter:\(counter) != iteration:\(iteration)") // often fails
    }
}

So... not much of an immediate solution and answer for your problem, but it was yet another case showcasing the need of more primitives, and since I'm on a day off I figured might as well spend the time to write it up -- hope this was interesting and I hope we'll get to discussing such semantics in depth in the future with the concurrency team :slight_smile:

18 Likes

Thank you for the in-depth response @ktoso! I particularly appreciated your thoughts on priority boosting during await, as that helped me understand (some of) the design optimizations that led to the current world.

A quick few thoughts:

  • From my 30,000ft view, I don't think FIFO or stricter reentrancy control is a requirement for wider adoption of Swift's concurrency model – reasoning about reentrancy has always been difficult. That said, improving our ability to reason about reentrancy would be a boon to us all, and would be a concrete improvement over the status quo.
  • You hit the nail on the head re send. When I first started playing with the modern Swift concurrency model I tried writing nonisolated methods that then spawned a Task to send the async instruction to the actor without awaiting the result, and quickly discovered the lack of deterministic ordering when spawning Tasks from a single context. A send semantic that can enqueue ordered work – or at least order the beginning of the work up until the first suspension point – would be much-welcomed, and enable me to ditch the DispatchQueue almost entirely.
2 Likes

+1

Thanks @ktoso for opening this discussion. I have been working on a few server side libraries/applications lately and the missing send functionality made me walk back on using an actor in almost all cases.
Like you said ordering is a big problem since you often need strict ordering when you interact with an actor. Secondly, the „workaround“ right now is spawning an unstructured Tasks which also has some performance overhead and often times the Task is not even awaited on. (Haven’t looked at your proposed implementation but maybe we can make it much more lightweight when we already know that nobody is awaiting/cancelling it)

The one thing that I am thinking about is wether every method on an actor should be able to be send or if actors should explicitly declare which methods support that. My thinking here is that if you already know that a method is only send then you don’t even have to take care of cancellation.

The workaround using AsyncStream actually performs pretty well, although the interface is bizarre (escaping a continuation from a closure?!) and there's a fair bit of boilerplate. It's certainly faster than spawning a new Task for each invocation, because you only have to spawn one Task. Then, you put a loop inside that task that keeps awaiting more values from the AsyncStream.

Here's how I did it in CSProgress (on the concurrency branch, which I still haven't merged into main yet)

Would the following implementation guarantee serial execution?

import Combine
import Foundation

public actor AsyncDispatchQueue {
    
    private let streamContinuation: AsyncStream<() async -> Void>.Continuation
    
    public init() {
        var streamContinuation: AsyncStream<() async -> Void>.Continuation?
        let stream = AsyncStream { continuation in
            streamContinuation = continuation
        }
        self.streamContinuation = streamContinuation!
        
        Task {
            for await work in stream {
                await work()
            }
        }
    }
    
    public nonisolated func send(work: @escaping () async -> Void) {
        streamContinuation.yield {
            await work()
        }
    }
    
    public func wait<T>(work: @escaping () async throws -> T) async throws -> T {
        try await withCheckedThrowingContinuation { continuation in
            streamContinuation.yield {
                do {
                    let result = try await work()
                    continuation.resume(returning: result)
                } catch {
                    continuation.resume(throwing: error)
                }
            }
        }
    }
    
    public func wait<T>(work: @escaping () async -> T) async -> T {
        await withCheckedContinuation { continuation in
            streamContinuation.yield {
                let result = await work()
                continuation.resume(returning: result)
            }
        }
    }
    
}

It does, and I've used something similar in my own projects. It also turns out you can do better in a few ways.
Firstly, there's no reason for AsyncDispatchQueue to be an actor since AsyncStream's continuation acquires a lock; a class is fine.
Secondly, the closures for wait don't actually need to be escaping, since they're guaranteed to exist for as long as the caller exists. You can work around the compiler by wrapping them in a class and using withoutActuallyEscaping.
Thirdly, if you're willing to accept the use of @_unsafeInheritExecutor, you can avoid task hops; the semantics of wait should be the same as with_Continuation in that it will only suspend once the operation has been enqueued on the AsyncStream.

If you combine those you end up with something like this (adapted from Substrate/TaskStream.swift at 3a79735933f2512f5b1d6be45847b86a7565c04d · troughton/Substrate · GitHub):

public final class AsyncDispatchQueue {
    @usableFromInline final class TaskHolder<R> {
        @usableFromInline let task: @Sendable () async throws -> R
        
        @inlinable
        init(task: @escaping @Sendable  () async throws -> R) {
            self.task = task
        }
    }
    
    let taskHandle: Task<Void, Never>
    @usableFromInline let taskStreamContinuation: AsyncStream<@Sendable () async -> Void>.Continuation
    
    public init(priority: TaskPriority = .medium) {
        var taskStreamContinuation: AsyncStream<@Sendable () async -> Void>.Continuation? = nil
        let taskStream = AsyncStream<@Sendable () async -> Void> { continuation in
            taskStreamContinuation = continuation
        }
        self.taskStreamContinuation = taskStreamContinuation!
        
        self.taskHandle = Task.detached(priority: priority) {
            for await task in taskStream {
                await task()
            }
        }
    }
    
    deinit {
        self.taskHandle.cancel()
    }
    
    @inlinable @inline(__always)
    @_unsafeInheritExecutor
    public func wait<T>(@_implicitSelfCapture _ perform: @Sendable () async -> T) async -> T {
        let taskStreamContinuation = self.taskStreamContinuation
        return await withoutActuallyEscaping(perform) { perform in
            let task = TaskHolder<T>(task: perform)
            let result: T = await withUnsafeContinuation { continuation in
                taskStreamContinuation.yield { [unowned task, continuation] in
                    let result = try! await task.task()
                    continuation.resume(returning: result)
                }
            }
            withExtendedLifetime(task) {}
            return result
        }
    }
    
    @inlinable @inline(__always)
    @_unsafeInheritExecutor
    public func wait<T>(@_implicitSelfCapture _ perform: @Sendable () async throws -> T) async throws -> T {
        let taskStreamContinuation = self.taskStreamContinuation
        return try await withoutActuallyEscaping(perform) { perform in
            let task = TaskHolder<T>(task: perform)
            let result: T = try await withUnsafeThrowingContinuation { continuation in
                taskStreamContinuation.yield { [unowned task, continuation] in
                    do {
                        continuation.resume(returning: try await task.task())
                    } catch {
                        continuation.resume(throwing: error)
                    }
                }
            }
            withExtendedLifetime(task) {}
            return result
        }
    }
    
    public func send(@_inheritActorContext @_implicitSelfCapture _ perform: @escaping @Sendable () async -> Void) {
        self.taskStreamContinuation.yield(perform)
    }
}
4 Likes

This conversation and suggestions above got me to put together a small (well-tested!) library with a serial-to-concurrent-context-sending FIFO queue and a nonisolated-to-actor-isolated-context-sending actor-ordered queue. I put these queues together without using underscored attributes, so I'm hoping this API will remain stable as new Swift releases drop.

As of the time of writing, the PRs introducing these queues are still open for reviews: consider this post an invitation to review! Hopefully y'all find this repo as useful as I found the above thread :slightly_smiling_face:

4 Likes