How do you use AsyncStream to make Task execution deterministic?

As discussed previously on this forum, the order of execution for tasks on actors is not deterministic. Unlike serial queues, actors are not strictly first-in, first-out.

In WWDC22 - Eliminate data races using Swift Concurrency, this is touched upon around the 23:20 mark and the importance of this is called out.

Two solutions are proposed by the presenter to allow for task ordering:

  • Use a single Task because they execute from beginning to end.
  • Use an AsyncStream

There are many use-cases where the first option is not viable. The second option seems promising as the presenter notes:

AsyncStream can be used to model an actual stream of events.

One task can iterate over the stream of events with a for-await-in loop, processing each event in turn.

An AsyncStream can be shared with any number of event producers, which can add elements to the stream while maintaining order.

Is there any sample code or example of this technique that can be shared to help some of us better understand how to implement that pattern properly?

Consider the use-case of a Database that is modelled as an actor:

actor Database { 
  func insert() {} 
  func update() {} 
  func delete() {} 
}

In such an implementation, it's critical that the order of tasks be deterministic. If a user were to create a record and then immediately update a record, the Database needs to process the insert() "task" first followed by the update() "task".

How can one use AsyncStream to model such a use-case as suggested by the session video?

Reference:
Task: Is order of task execution deterministic?

4 Likes

I'd start with something like this:

/// Represents a command that gets sent to the database.
protocol DatabaseCommand {
  // ...
}

struct InsertCommand: DatabaseCommand {
  // ...
}

struct UpdateCommand: DatabaseCommand {
  // ...
}

/// The producer side of the stream. Pass this to tasks that send commands to the database.
var commandInput: AsyncStream<any DatabaseCommand>.Continuation! = nil
/// The consumer side of the stream. Processed by databaseProcessingTask.
let commandStream = AsyncStream((any DatabaseCommand).self, bufferingPolicy: .unbounded) { continuation in
  commandInput = continuation
}

/// This task processes the commands.
/// The task runs until `commandInput` is closed.
let databaseProcessingTask = Task {
  for await command in commandStream {
    // Send command to database …
  }
}

// Other tasks can send commands to the stream
Task {
  // Guaranteed to be processed in order:
  commandInput.yield(InsertCommand())
  commandInput.yield(UpdateCommand())
}

Notes:

  • Create a type that represents a command that gets sent to the database.
  • Create an AsyncStream of database commands and store the stream's continuation in a variable.
  • Start one task (databaseProcessingTask) that for-loops over the stream and sends the incoming commands to the database. This loop is guaranteed to process the commands in the same order they are sent into the stream.
  • Pass the continuation to the parts of your program that submit database commands. continuation.yield(…) sends a command to the processing task.

Does this make sense?

(Edit: updated the code snippet to use the any P syntax.)

1 Like

Thanks Ole. If I extend your suggestion a little bit to place it within the context of an application, I'm guessing it would look something like this:

class View: UIView { 
  func handleNewRecordButton() { 
    commandInput.yield(InsertCommand())
  }

  func handleSaveRecordButton() { 
    commandInput.yield(UpdateCommand())
  } 

  func handleDeleteRecordButton() { 
    commandInput.yield(DeleteCommand())
  }
}

Is this semantically equivalent to:

  func handleNewRecordButton() { 
    serialQueue.async { 
      database.process(InsertCommand())
    }
  }

I guess this works because AsyncStream.yield() is not async itself, thus it gets executed immediately and the value passed in is immediately "enqueued" into the AsyncStream?

Task execution order is a design concern only when asynchronous functions are involved, because suspension can lead to actor reentrancy — Swift Concurrency allows other independent tasks to be executed while previous tasks are suspended.

With the API interface defined in your problem, all the data manipulation operations are synchronous on the actor itself — there can be no suspension, or even the possibility of a cooperative yield to begin with. So the issues around task execution order will not happen, at least at the level of these individual synchronous functions.

--

If you provide an async actor.with(_:) for your API consumer, this would minimally satisfy at least atomicity and consistency typically expected of a database API:

  1. with(_:) being async ensures that all calls must respect actor isolation (incl. mutual exclusion).

  2. It shall accept a synchronous, sendable (isolated Database) -> Result closure, representing an atomic, non-suspending unit of database work to be run on the actor. The isolated Database parameter enables the closure scope to access to all the synchronous DML operations.

Definition:

extension Database {
    // or call it `transaction(_:)` if you prefer.
    func with<Result>(
        // NON-ASYNC closure
        _ transaction: @Sendable (isolated Database) throws -> Result
    ) async rethrows -> Result {
        return try transaction(self)
    }
}

Usage:

let database: Database
await database.with { database in
    // The closure scope is synchronous, and all the calls we made below
    // are synchronous.
    //
    // So it is guaranteed to run till the end, since there is never an
    // explicit cooperative yield, or a suspension (`await`).
    database.insert()
    database.update()
}

await database.with { database in
    // Ditto, but this is a second "transaction".
    database.delete()
}
1 Like

Blockquot

It's not clear to me how this implementation is any different than just using a normal Task {}. I know that non-suspended code within a single Task {} block is run "top to bottom", but I'm trying to guarantee the execution order of discrete Tasks, or in this example discrete calls to await database.with {}.

It's my understanding, that in this proposal the order of execution of the two calls to await database.with {} is non-deterministic, which is the problem I'm trying to solve.

A serial queue ensures that discrete blocks of work are run in a deterministic order of first-in, first-out. I'm trying to achieve a similar outcome using async/await.

Consider the following flow:

1. User presses New Record button.

// Task 1
Task { database.newRecord() }

2. User then presses Update Record button

// Task 2
Task { database.updateRecord() }

3. User then presses Delete Record button

// Task 3
Task { database.deleteRecord() }

I want to make sure that those discrete Tasks run in the order 1, 2, 3. I can see how using two AsyncStreams could achieve that which is as far as I've gotten so far.

Another approach I've taken was to serialize the tasks simply by chaining them to a globally-synchronized handler of the last one:

private enum Global {
    static var lock = os_unfair_lock_s()
    static var lastSerialTask: Task<Void, Never>?
}

public extension Task where Success == Void, Failure == Never {
    /// Serially dispatch async work from the context of the current executor.
    ///
    /// Unlike `Task {}` and `Task.detached {}`, when run from the ``MainActor``, this method
    /// guarantees that work will run in the order it was scheduled. Work scheduled from multiple
    /// actors will be started in a FIFO order, though the usual asynchronous guarantees apply.
    ///
    /// - Parameters:
    ///   - priority: The priority of the task. Pass `nil` to use the priority from `Task.currentPriority`.
    ///   - operation: The operation to perform.
    nonisolated static func serial(priority: TaskPriority? = nil, operation: @escaping @Sendable () async -> Void) {
        os_unfair_lock_lock(&Global.lock)
        let previousTask = Global.lastSerialTask
        Global.lastSerialTask = Self.detached(priority: priority) {
            await previousTask?.value
            await operation()
        }
        os_unfair_lock_unlock(&Global.lock)
    }
}

Usage is pretty straightforward:

class View: UIView { 
  func handleNewRecordButton() {
    Task.serial { [weak self] in
      self.database.insert(...)
    }
  }

  func handleSaveRecordButton() {
    Task.serial { [weak self] in
      self.database.update(...)
    }
  } 

  func handleDeleteRecordButton() {
    Task.serial { [weak self] in
      self.database.delete(...)
    }
  }
}

That said, if you want to avoid the unfair locks, you can prefix all those methods with @MainActor to get a similar result, since those methods can now be called synchronously from there:

actor Foobar {
    init() {}
    
    func printI(_ i: Int) {
        print("Iter \(i)")
    }
    
    @MainActor static var lastTask: Task<(), Never>?
    @MainActor static func run(closule: @escaping () async -> ()) {
        let previousTask = lastTask
        lastTask = Task.detached {
            await previousTask?.value
            await closule()
        }
    }
}

let foobar = Foobar()
@MainActor func delegate() {
    for i in 0..<15 {
        Foobar.run {
            await foobar.printI(i)
        }
        print("Scheduled \(i)")
    }
}

Task { @MainActor in
    delegate()
}

(the above is more generalized though).

When I discussed this in the swift-server slack however, AsyncStream was also suggested, so @ole's suggestion is by no means wrong. I just found this approach to be easier to use, and easier to be sure it's working. This also came up last year on twitter where a few approaches were discussed: https://twitter.com/logancollins/status/1480602762618408966

This isn't right. See this.

1 Like

Hi

I think there might also be a solution using AsyncChannel from “Swift Async Algorithms” as an entry point.

AsyncChannel is Sendable so you can use it from different Tasks. Moreover its ‘send’ function will suspend as long as the current value has not been consumed. Every awaiting producers are registered in a first in first out order.

Task {
    for await command in channel {
        await command.execute()
    }
}

Task {
    await channel.send(command1)
}

Task {
    await channel.send(command2)
}

Task {
    await channel.send(command3)
}
2 Likes

os_unfair_lock_lock(&Global.lock)

This isn't right. See this.

I don't believe that is the case here, since a static property is a stable global variable, whose address won't change once it's created the very first time it is accessed.

Please see here, it is said that it is a violation of the law of exclusivity and that using & may produce different pointers each time you use it.

1 Like

Argh, now I understand your problem better with the illustrations.

Stepping back from any particular solution, the root issue IMO is not with Task execution order being indeterministic. Instead, it is us trying to do practical fire-and-forget calls to the actors, in response to MainActor UI event callbacks that are non-async. (Living at the edge of structured concurrency, as some might say)

Your other actor-native logic consuming this Database API would not be subject to this problem, because structured concurrency makes sure that those (tasks from the) caller actors will await for your Database actor call to complete before moving onto whatever logic comes next (which may or may not include more Database actor calls).

This is all to say that it should not be necessarily to refactor it into a Command-CommandQueue pattern (a legitimate solution, but argubly heavy handed) for it to work in your case.

There are two approaches I can see:

  1. Wait for custom executor — you can provide an Executor with FIFO job guarantee for your Database actors.

  2. A interim "serial queue of tasks", where it iterates over async tasks (in the form of () async -> Void closures), but only starts one after the previous one has completed.

You can make such queue as an actor too:

A minimal implementation of such task queue:

final class TaskQueue: @unchecked Sendable {
    private var queue: [() async -> Void] = []
    private var runningTask: Task<Void, Never>?
    private let lock = NSLock()

    func enqueue(_ action: @Sendable @escaping () async -> Void) {
        withLock {
            if runningTask != nil {
                // There is a suspended running task.
                // Queue the `action` and leave it to that task to eventually drain & execute it.
                queue.append(action)
            } else {
                // There is no running task.
                // Mark that we are now running, and execute `action` right away.
                runningTask = Task {
                    await action()

                    // Iteratively process all the enqueued tasks, that have come concurrently while
                    // we have been executing or suspending.
                    while let nextTask = self.dequeue() {
                        await nextTask()
                    }

                    self.withLock { self.runningTask = nil }
                }
            }
        }
    }

    private func withLock<R>(_ action: () -> R) -> R {
        lock.lock()
        defer { lock.unlock() }
        return action()
    }

    private func dequeue() -> (() async -> Void)? {
        withLock { !queue.isEmpty ? queue.removeFirst() : nil }
    } 
}

where it can be consumed on MainActor as such:

let database = Database()
let taskQueue = TaskQueue()

let editAction = UIAction { _ in
    taskQueue.enqueue {
        await database.with { $0.updateRecord() }
    }
}

let deleteAction = UIAction { _ in
    taskQueue.enqueue {
        await database.with { $0.deleteRecord() }
    }
}

let createAction = UIAction { _ in
    taskQueue.enqueue {
        await database.with { $0.insertRecord() }
    }
}

So now instead of trying to shoehorn the database API into Command-CommandQueue pattern in order to solve it with AsyncSequence, you get to keep the database API in a typical actor shape. You can also turn this TaskQueue (or whatever name you'd prefer) into a no-op in the future, when you can eventually supply a custom executor to the Database actor.

Ah, I see what you mean. This should be fine then:

private enum Global {
    static var lock: UnsafeMutablePointer<os_unfair_lock> = {
        let pointer = UnsafeMutablePointer<os_unfair_lock>.allocate(capacity: 1)
        pointer.initialize(to: os_unfair_lock())
        return pointer
    }()
    static var lastSerialTask: Task<Void, Never>?
}

public extension Task where Success == Void, Failure == Never {
    /// Serially dispatch async work from the context of the current executor.
    ///
    /// Unlike `Task {}` and `Task.detached {}`, when run from the ``MainActor``, this method
    /// guarantees that work will run in the order it was scheduled. Work scheduled from multiple
    /// actors will be started in a FIFO order, though the usual asynchronous guarantees apply.
    ///
    /// - Parameters:
    ///   - priority: The priority of the task. Pass `nil` to use the priority from `Task.currentPriority`.
    ///   - operation: The operation to perform.
    nonisolated static func serial(priority: TaskPriority? = nil, operation: @escaping @Sendable () async -> Void) {
        os_unfair_lock_lock(Global.lock)
        let previousTask = Global.lastSerialTask
        Global.lastSerialTask = Self.detached(priority: priority) {
            await previousTask?.value
            await operation()
        }
        os_unfair_lock_unlock(Global.lock)
    }
}

Doesn't the use of Task and await in this example negate the serialization that you're trying to achieve with @MainActor?

Three consecutive calls to TaskQueue.enqueue will create three separate Tasks, which in turn will each call await. Just because the entry point was on the main actor doesn't mean that the newly spawned Tasks and their calls to await will be executed in serial order, does it?

The way I see this, the await within the @MainActor version of enqueue creates a suspension point waiting on the private enqueue() implementation. The call to queue.append() is where the serialization is required so that each action is enqueued in the correct order, but I don't see that being guaranteed with this suggestion.

Your observation is right; I overlooked that aspect when drafting the task queue implementation using actor. I have updated it to be a Sendable class.

Thanks for the update. The end result is that this implementation doesn't use an actor and is very similar to just using a normal class that has an internal serial queue or lock to provide serialization.

In such an implementation, I'd just use a GCD serial queue and async continuations to implement it, effectively creating an object that uses GCD internally but exposes an async API.

Nothing wrong with that approach, but I was wondering if the GCD component could be replaced with actors.

Looks good :+1:

I've written a version of a queue which can run async closures with n amount of parallelization here: playgrounds/TaskQueue.swift at main · gshahbazian/playgrounds · GitHub

Initializing with TaskQueue(concurrency: 1) provides FIFO execution.

Would love to get some feedback on this implementation : )

Indeed, there are various implementations to serializing anonymous async functions.

It does not appear to have a "native" Swift Concurrency solution right now, perhaps only until one can define a Custom Executor and request an unstructured/detached task to run on a specific Executor.

1 Like

I don't believe it guarantees FIFO execution because there's no guarantee what order an actor receives its messages in. (At least FIFO from the external caller's perspective.)

This is discussed in an earlier forum post:

Task: Is order of task execution deterministic?

I‘m also currently digging though that rabbit hole. The mentioned TaskQueue actor will fail because enqueue is asynchronous and creates a suspension point. In other words if you have a reference that that actor from two different places and call enqueue to schedule work at the same time, there is no guarantee which one will win and be enqueued.

Btw. this type looks a lot like AsyncChannel.

The partial draft that I have in my mind looks like this:

func doWork() async {
  await withCheckedContinuation { continuation in
    queueReference.enqueue {
      // do work here, then call the continuation to resume `doWork`
    }
  }
}

That enqueue operation should be be a non-async function that‘d take a @Sendable @escaping () -> Void closure. Throwingness can be modeled with a nested do / catch and a surrounding withCheckedThrowingContinuation.

It‘s a lot of boilerplate, but in theory it‘d do the trick. However the main problem is create the FIFO queue that is also operating on the cooperative thread pool. That‘s my current problem which I haven‘t figured out.