Passing values to an Task/Actor in serial manner

Hi, I have a really simple code, which send some values from synchronous func to async func.
Values received in async func are not in order.

    func sendValues() {
        for i in 0..<1000 {
            Task {
                await count(i)
            }
        }
    }
    
    func count(_ value: Int) async {
        print(value) // values received NOT in order
    }

As far as I know unstructured Tasks are scheduled on global concurrent executor so may execute in any order.

But why async function on MainActor (which use serial executor) also receives values not in order?

    func sendValues() {
        for i in 0..<1000 {
            Task { @MainActor in
                await count(i)
            }
        }
    }
    
    @MainActor func count(_ value: Int) async {
        print(value) // values received NOT in order
    }

Question 1: Is there any async step in-between calling 'Task { @MainActor ... }' and scheduling task job on MainActor serial executor that causes this?

Question 2: Is there any way to enter Swift Concurrency from synchronous code in order? e.g. Some Combine publisher publishes values in order and we want to cosumes them in async func/actor in order?

Thanks :)

2 Likes

Even though actors have serial executors, they do not guarantee to execute work in the same order as it is submitted. This video provides more information as to why that is: Swift concurrency: Behind the scenes - WWDC21 - Videos - Apple Developer

One great insight I read on these forums a while ago, was (paraphrased):
Actors are for isolating state. Tasks are for executing things serially.

With that in mind, we can say that on the async side you'd want a single task that processes the values. In your example this can easily be done by creating a single task and putting the loop inside of it. However, I assume the example is a simplified version of your code, and this will not suffice. What you probably need is a task that waits for values and processes them when they arrive (and they should arrive in order). Depending on your needs, this can be done in various ways, but here's a simple example using AsyncStream:

// This is a bit of a strange initialization of the stream, but in Swift 5.9
// there will be a similar makeStream convenience function on AsyncStream
func makeStream() -> (AsyncStream<Int>, AsyncStream<Int>.Continuation) {
    var continuation: AsyncStream<Int>.Continuation!
    let stream = AsyncStream<Int>() { continuation = $0 }
    return (stream: stream, continuation: continuation!)
}

func sendValues(to continuation: AsyncStream<Int>.Continuation) {
    for i in 0..<1000 {
        continuation.yield(i)
    }
    continuation.finish()
}

func processValues(from stream: AsyncStream<Int>) {
    Task {
        for await value in stream {
            await count(value)
        }
    }
}

func count(_ value: Int) async {
    print(value) // values received in order
}

let (stream, continuation) = makeStream()
processValues(from: stream)
sendValues(to: continuation)

Note: I am not familiar with Apple's frameworks, so I cannot say much about the interaction between Combine and Swift concurrency. There might be functionality available in Combine that easily accomplishes exactly what you need.

3 Likes

Tasks, whether detached or not, do not execute serially, they execute concurrently, and may execute in arbitrary order (tasks created after other tasks may execute first). There is no way, currently, to perform serial execution without modeling it yourself, like the AsyncStream example you show.

1 Like

Indeed, tasks execute concurrently and in arbitrary order. However, within a task, things execute serially.

The only execution within a task is in the closure you create it with, and the ordering constraints there are the same as if you called it in any async context. That execution is subject to the same concurrent execution as any other async context, where any suspension allows other execution to continue, including other tasks enqueued from within the current task. So being in a task has no special behavior here.

Suspensions may happen, other tasks may be running concurrently, and new tasks may be started from a task, but the code that executes within a task runs serially from the moment the task is created until it is finished.

This quote from the structured concurrency evolution proposal might be more clear:

A task is the basic unit of concurrency in the system. Every asynchronous function is executing in a task. In other words, a task is to asynchronous functions, what a thread is to synchronous functions. That is:

  • All asynchronous functions run as part of some task.
  • A task runs one function at a time; a single task has no concurrency.
  • When a function makes an async call, the called function is still running as part of the same task (and the caller waits for it to return).
  • Similarly, when a function returns from an async call, the caller resumes running on the same task.

Right, but that's not a useful description that helps here. That synchronous code runs synchronously shouldn't surprise anyone.

I think the fundamental issue here is that the proposal uses task to describe structured concurrency while Task is unstructured concurrency, and the two aren't equivalent. For example, Tasks don't participate in cooperative cancellation, but structured tasks, such as those created by TaskGroups, do. (In fact I think TaskGroups are the only way to create structured tasks?) This vocabulary has lead to a lot of confusion in the community.

1 Like

Not sure what you mean here, since we’re talking about asynchronous code.

The original question was about why the main actor processed the values out of order. What I tried to convey is that in Swift’s async world, an actor is not used to execute things in order, but a task is. An actor is used to isolate state.

They are all tasks. Structured or unstructured doesn’t matter here.

Once again, that's not true. The same code executed synchronously in a Task will execute in exactly the same way in an actor. Really the only difference is that execution in an actor is isolated to the actor by default (you can also have it execute outside the isolation if you don't access protected state). You can get the same isolation for a Task by executing the Task from within the actor, or if the actor is a global actor.

In regards to concurrent execution, sure. There are other differences though.

There's some confused/confusing wording being used in this thread. The Swift book should definitely be improved on concurrency topics such as these, but meanwhile:

Yes, that's the root of the problem. Task{} is scheduled on the global pool, begins running the closure, notices it is @MainActor and hops to it. This is arguably not great, because that is exactly why we lose ordering in such code:

Task { @MainActor in a() }
Task { @MainActor in b() }

which may be "arrive" at the main actor in any order... The "SerialExecutor" protocol means that the task once it is run on such actor executes in the expected serial fashion as you'd expect a task to be executing. The actor doesn't really to much here in terms of ordering.

Yes and no... The only correct way to do this today is very verbose: you have to make an AsyncStream, make a single Task{ for await message in stream {} } to consume these messages, and stream.yield(.message) into it from the outside world. This will produce the expected order: stream.yield(a); stream.yield(b).

It's not great to have to be so verbose about it, and the primary reason Task{} can't do this is because it doesn't know at enqueue time where the code would end up executing. As far as the runtime is concerned it was just passed "some async closure", and we don't have a way to check at runtime "hey, is this actually specifically going to immediately jump to some actor?". We're missing an ability to express and check such thing in the language/compiler.

I'd personally slot this as something we should improve upon in future releases but no plans have been made about this yet. Even with such "better" Task{} you would not be guaranteed order because priority escalation on the returned task could boost it in front of the queue.

One alternative idea would be to introduce a send actor.thing() operation, prototyped here, that would be guaranteed to do the right thing. But both more discussion and Swift evolution are needed to figure out if this is the right solution or if something else might be.


I also see this thread getting a bit confused in wording and terminology.

Manolo's phrase that "Tasks are for executing things serially / in-order" is correct. That is the only way in today's concurrency model to guarantee strict order. Thus why this "consume the messages stream from one task" approach above would work.

Wether or not a task is executed on an actor or just the global pool does not matter at all to be honest. The only thing that guarantees order is how code is executed "step by step" in a Task. It also does not matter for purposes of ordering if a task is structured or unstructured.

Hope this helps

9 Likes

Can you clarify what you mean here? A Task simply encapsulates some set of work, like any async context. It makes no different ordering guarantees than typical code execution, so what special behavior are you talking about here? Given the initial global execution that you mentioned, Tasks don't guarantee strict ordering, so what do you mean?

Edit to add: As far as I know, the only way to guarantee strict ordering is to do what you suggested and pass everything through a stream. Every other async construct is concurrent and can run in arbitrary order. To me, the fact that code runs in order in a Task is meaningless, since the only guarantee there is the same guarantee you get everywhere else.

A task is an asynchronous context, there's no other concept of an asynchronous context. If you're in an asynchronous context, you're in a task.

Sure "it just works" in a task due to the ordering "of normal code" but it is far from obvious or trivial that it works. I'm just stating that that's where the guarantee is actually made. Asynchronous code is not the same as synchronous code, it takes effort to get this ordering there, even if "obvious".

I'm still not sure what behaviors you're referring to. To ask another way, if we didn't have this guarantee (which I still don't see), what would the behavior be?

Async code would not behave correctly at all, violating program order. All I'm stating is that that is the only guarantee you have with regards to order in Swift concurrency today. There's no "FIFO" guaranteeing-once-enqueued executor, and there is no public way enqueue tasks onto an actor in guaranteed order -- see the linked send operation which does the latter. All you can rely on to get order is that "obvious" ordering guarantee that a Task gives you.

Right, I'm just asking what this special guarantee is. Is it literally that code run in a task behaves the same as code not run in a task? That this:

Task { @MainActor in
  syncOne()
  syncTwo()
}

runs the same as this?

@MainActor
func run() {
  syncOne()
  syncTwo()
}

(barring any difference in hops to MainActor)

That doesn't seem like a special guarantee as, like you said, it couldn't work correctly any other way. And I'm not sure how it helps in regards to the original problem. You can, of course, use this behavior to try and make operations within an actor more atomic, but you still have to be careful of reentrancy and other callers starting work that isn't otherwise blocked by protected state. If that's what you're talking about, okay, but that seems more like a property of actors (functions called within isolation act as if they're synchronous and so don't suspend) rather than Tasks themselves.

Add some awaits there; that's what I'm talking about; and yes, it's "obvious" and necessary. It is the only strict order guarantee the runtime provides which is what I'm highlighting.

None of this is a property of swift actors.

No need to convince me personally about the problems associated with reentrancy. I am highlighting though that swift actors have currently no properties that help here. My first reply in this thread also includes a link to a prototype of what would (or rather, does) solve the problem the opening thread asked about. There's no other solutions today: either async stream (or any other queue really) and a single Task for consuming, or the linked prototype "send"-like operation.

1 Like

I wonder if some confusion might originate from that we use different definitions for 'task':

  • The construct that creates a new task, including the closure that is executed, i.e.: Task { ... }

versus:

  • The asynchronous context that is created by this, in which the code executes. Including all async (or sync) functions that are called from the closure.

See this line from the evolution proposal:

When a function makes an async call, the called function is still running as part of the same task (and the caller waits for it to return).

Both run in a task. It's just that in case of the second one, the task is created somewhere else. For example:

Task { @MainActor in
  run() // Your run() function runs in the task that is created here
}

Task {
  await run() // Same: your run() function runs in the task that is created here.
}

I don't recall this line but it seems to contradict how async functions typically run, where the callee determines its own execution context. Or perhaps I don't know what "part of the same task" entails. I can see how your examples are equivalent in regards to execution (global executor -> main actor) (though hopefully they won't be the same in the future), but I don't know what mean by "runs in the task that is created here". AFAIK, this especially isn't true in the second case, where the Task's closure is run on the global executor but run() executes on the main actor, exactly the same as if I'd called it from an async function or actor. So in what sense is that run "in" the task? In the first example I can see it being "in" the task because you've explicitly given the task closure the main actor context. So what does it mean to be "in" the task?

I would describe the second case as follows:

  1. The task starts executing the closure on the global executor.
  2. The task hops to the executor of the main actor to execute run().
  3. When run() returns, the task hops back to the global executor to execute the rest of the closure.

In this specific example there's not much to execute for step 1 and 3, but hopefully the idea is clear.

So by "in the task" you mean "on the executor provided by the task's closure"?