Async function - to be executed sequentially

Hi,

Overview

  • I have a async function f1(), I would like f1 executions to happen sequentially.
  • f1 is being called by many other functions.
  • If f1 is called before the previous execution of f1 is completed then it needs to wait for the previous execution to completed before proceeding.

Questions:

  1. What is a good way to implement this?
  2. Is my attempt reasonable or is there a better way to implement this?
  3. It seems like a common scenario, but haven't found a straight forward way, or am I missing something obvious?

My attempt:

actor Car {
    private var lastTask: Task<(), Error>?
    
    private func f1() async throws {
        print("f1 started")
        try await Task.sleep(nanoseconds: 4_000_000_000)
        print("f1 completed")
    }
    
    func f2() async throws {
        
        _ = await lastTask?.result
        
        let currentTask = Task {
            try await f1()
        }
        
        lastTask = currentTask

        _ = await currentTask.result
    }
}

Invoking

let car = Car()

Task {
    try await car.f2()
}

Task {
    try await car.f2()
}

Output

f1 started
f1 completed
f1 started
f1 completed
2 Likes

Above code does not prevent concurrent tasks from entering f1 before the previous task exited. The code only works for two tasks.

When there are two (or more) tasks waiting and one task is executing, the waiting tasks all await the same condition. So all of them can enter when the condition is met.

Just add a third task to try it out.

3 Likes

@nikolai.ruhe

Thanks a lot!! you are spot on, it doesn't work with more than 2.

So what is the right way to ensure sequential execution?

It depends on how "low level" you want to drop in the Concurrency toolbox, but if you only want to play with tasks, the following will ensure sequential execution

actor Car {
    private var currentTask: Task<Void, Error>?
    
    private func f1() async throws {
        print("f1 started")
        try await Task.sleep(nanoseconds: 4_000_000_000)
        print("f1 completed")
    }
    
    func f2() async throws {
        guard let queuedTask = currentTask else {
            currentTask = Task {
                try await f1()
            }
            try await currentTask?.value
            return
        }
        let newTask = Task {
            try? await queuedTask.value
            try await f1()
        }
        currentTask = newTask
        try await newTask.value
    }
}
1 Like

@MarSe32m: I think in your solution every call to f2 will capture the previous Task instance, forming an ever growing "linked list" of task instances.

This might not be a problem, but if f2 is called often, there's at least some amount of memory leaking.

Edit: As explained in the replies this is wrong! Thanks @MarSe32m for pointing this out.

1 Like

It does capture, yes, but that is intentional to ensure sequential execution. Note at each call to f2() will only capture the latest task and it will be cleaned up once that call to f2() finishes so the "linked list" will not grow forever.

Any kind of solution to this problem, as it is stated, will require memory allocation, so I wouldn't characterize this as a memory leak. If f2() is called very very often and very very frequently, then the problem ought to be solved in a different way in the first place (instead of using the Car actor etc.)

2 Likes

@MarSe32m Thanks a lot!!! that works as expected!

I guess in my code I wasn't waiting for the lastTask inside the Task { }. Thanks for pointing it out.

Just curious if you had any other alternative approaches to solve the same problem?

If you are willing to use continuations, you could implement a semaphore type and use it to sequentialize the execution.

actor Semaphore {
    private var count = 0
    private var waiters = [UnsafeContinuation<Void, Never>]()
    
    public init(count: Int) {
        self.count = count
    }
    
    private func wait() async {
        count -= 1
        if count >= 0 { return }
        await withUnsafeContinuation {
            waiters.append($0)
        }
    }
    
    private func signal() {
        count += 1
        if waiters.isEmpty { return }
        waiters.removeFirst().resume()
    }
    
    public func withTurn(_ procedure: @Sendable () async throws -> ()) async rethrows {
        await wait()
        defer { signal() }
        try await procedure()
    }
}

actor Car {
    private let semaphore = Semaphore(count: 1)
    
    private func f1() async throws {
        print("f1 started")
        try await Task.sleep(nanoseconds: 4_000_000_000)
        print("f1 completed")
    }
    
    func f2() async throws {
        try await semaphore.withTurn {
            try await f1()
        }
    }
}

With this kind of construct you could also eliminate the need of a separate f2() function, by just using the semaphore inside f1()

func f1() async throws {
    try await semaphore.withTurn {
        print("f1 started")
        try await Task.sleep(nanoseconds: 4_000_000_000)
        print("f1 completed")
    }
}

Of course when using a semaphore you need to be careful to not introduce "deadlocks" when one function acquires it after it has already been acquired resulting in an indefinite suspension.

I have implemented one in one of my toy projects or you could use a more advertised one.

3 Likes

@MarSe32m Thanks a lot for pointing out the alternate approach.

I am not familiar with UnsafeContinuation and I will read about it and come back.

There is so much to learn about this. I need to refresh the basics and then will read about UnsafeContinuation. Thanks a lot!

and it will be cleaned up once that call to f2() finishes

You are right, thank you!

I was under the wrong assumption that the task would retain the closure as long as a Task handle is around. But this is not the case, it is released as soon as the task finishes execution.

My bad.

2 Likes