somu
(somu)
1
Hi,
Overview
- I have a
async function f1(), I would like f1 executions to happen sequentially.
-
f1 is being called by many other functions.
- If
f1 is called before the previous execution of f1 is completed then it needs to wait for the previous execution to completed before proceeding.
Questions:
- What is a good way to implement this?
- Is my attempt reasonable or is there a better way to implement this?
- It seems like a common scenario, but haven't found a straight forward way, or am I missing something obvious?
My attempt:
actor Car {
private var lastTask: Task<(), Error>?
private func f1() async throws {
print("f1 started")
try await Task.sleep(nanoseconds: 4_000_000_000)
print("f1 completed")
}
func f2() async throws {
_ = await lastTask?.result
let currentTask = Task {
try await f1()
}
lastTask = currentTask
_ = await currentTask.result
}
}
Invoking
let car = Car()
Task {
try await car.f2()
}
Task {
try await car.f2()
}
Output
f1 started
f1 completed
f1 started
f1 completed
2 Likes
Above code does not prevent concurrent tasks from entering f1 before the previous task exited. The code only works for two tasks.
When there are two (or more) tasks waiting and one task is executing, the waiting tasks all await the same condition. So all of them can enter when the condition is met.
Just add a third task to try it out.
3 Likes
somu
(somu)
3
@nikolai.ruhe
Thanks a lot!! you are spot on, it doesn't work with more than 2.
So what is the right way to ensure sequential execution?
MarSe32m
(Sebastian Toivonen)
4
It depends on how "low level" you want to drop in the Concurrency toolbox, but if you only want to play with tasks, the following will ensure sequential execution
actor Car {
private var currentTask: Task<Void, Error>?
private func f1() async throws {
print("f1 started")
try await Task.sleep(nanoseconds: 4_000_000_000)
print("f1 completed")
}
func f2() async throws {
guard let queuedTask = currentTask else {
currentTask = Task {
try await f1()
}
try await currentTask?.value
return
}
let newTask = Task {
try? await queuedTask.value
try await f1()
}
currentTask = newTask
try await newTask.value
}
}
2 Likes
@MarSe32m: I think in your solution every call to f2 will capture the previous Task instance, forming an ever growing "linked list" of task instances.
This might not be a problem, but if f2 is called often, there's at least some amount of memory leaking.
Edit: As explained in the replies this is wrong! Thanks @MarSe32m for pointing this out.
1 Like
MarSe32m
(Sebastian Toivonen)
6
It does capture, yes, but that is intentional to ensure sequential execution. Note at each call to f2() will only capture the latest task and it will be cleaned up once that call to f2() finishes so the "linked list" will not grow forever.
Any kind of solution to this problem, as it is stated, will require memory allocation, so I wouldn't characterize this as a memory leak. If f2() is called very very often and very very frequently, then the problem ought to be solved in a different way in the first place (instead of using the Car actor etc.)
2 Likes
somu
(somu)
7
@MarSe32m Thanks a lot!!! that works as expected!
I guess in my code I wasn't waiting for the lastTask inside the Task { }. Thanks for pointing it out.
Just curious if you had any other alternative approaches to solve the same problem?
MarSe32m
(Sebastian Toivonen)
8
If you are willing to use continuations, you could implement a semaphore type and use it to sequentialize the execution.
actor Semaphore {
private var count = 0
private var waiters = [UnsafeContinuation<Void, Never>]()
public init(count: Int) {
self.count = count
}
private func wait() async {
count -= 1
if count >= 0 { return }
await withUnsafeContinuation {
waiters.append($0)
}
}
private func signal() {
count += 1
if waiters.isEmpty { return }
waiters.removeFirst().resume()
}
public func withTurn(_ procedure: @Sendable () async throws -> ()) async rethrows {
await wait()
defer { signal() }
try await procedure()
}
}
actor Car {
private let semaphore = Semaphore(count: 1)
private func f1() async throws {
print("f1 started")
try await Task.sleep(nanoseconds: 4_000_000_000)
print("f1 completed")
}
func f2() async throws {
try await semaphore.withTurn {
try await f1()
}
}
}
With this kind of construct you could also eliminate the need of a separate f2() function, by just using the semaphore inside f1()
func f1() async throws {
try await semaphore.withTurn {
print("f1 started")
try await Task.sleep(nanoseconds: 4_000_000_000)
print("f1 completed")
}
}
Of course when using a semaphore you need to be careful to not introduce "deadlocks" when one function acquires it after it has already been acquired resulting in an indefinite suspension.
I have implemented one in one of my toy projects or you could use a more advertised one.
4 Likes
somu
(somu)
9
@MarSe32m Thanks a lot for pointing out the alternate approach.
I am not familiar with UnsafeContinuation and I will read about it and come back.
There is so much to learn about this. I need to refresh the basics and then will read about UnsafeContinuation. Thanks a lot!
and it will be cleaned up once that call to f2() finishes
You are right, thank you!
I was under the wrong assumption that the task would retain the closure as long as a Task handle is around. But this is not the case, it is released as soon as the task finishes execution.
My bad.
2 Likes