A couple of months ago, I posted about how me and my team experienced our journey of fully embracing Swift Concurrency in several projects. I also mentioned some points where we felt like we were too early or said differently, where there was no good solution available in the open-source land or also in the standard library.
I also recognized that some those tools (deadlines, queues, ...) probably do not quite fit with the standard library or related projects and are also quite niche, so I began open-sourcing my implementations.
One of the remaining topics for us was the missing support for queueing asynchronous functions or essentially restricting 1-n amount of tasks executing code in parallel (if anyone is interested in the use cases we had for this, they are described in my other post). There are already several popular packages out there, which try to solve this problem, for instance Queue
, Lock
or Semaphore
which I have been using. While I do not have any noteworthy problems with any of these libraries, there were concerns raised about possible priority inversion.
Recently, there was a proposal to add manual priority escalation APIs (SE-0462) which has been accepted and I thought that with this set of APIs it might now be possible to implement another approach to this problem which I saw here.
I've now ended up with an experimental implementation of this, which seems to be passing the unit tests I wrote for it. Due to the popularity of the other projects and implied demand for this kind of API I wanted to get some comments and feedback early on so together we could end up shaping this API so it fits most the communities use cases, not only mine. I'd be more than happy to receive any feedback, but some examples would be if it would be useful to you, naming, API surface, but also implementation details or maybe even misuses of APIs.
AsyncLimiter
The surface is almost identical to what John McCall came up with in the thread I linked previously:
public final class AsyncLimiter: Sendable {
public init(limit: Int)
public func withControl<R, E>(isolation: isolated (any Actor)? = #isolation, operation: () async throws(E) -> sending R) async throws -> R
}
You'd use it like this, kind of an asynchronous mutex, if you will:
let limiter = AsyncLimiter(limit: 1)
Task {
try await limiter.withControl {
// do something that should only be run once at a time
try await Task.sleep(for: .seconds(5))
}
}
// Somewhere else
Task {
try await limiter.withControl {
// this will only ever be executed after the 5 seconds of the previous task have passed
}
}
But you could also do something like this:
let limiter = AsyncLimiter(limit: 2)
await withThrowingTaskGroup(of: Void.self) { taskGroup in
for index in 0..<10 {
taskGroup.addTask {
try await limiter.withControl {
try await Task.sleep(for: .seconds(1))
}
}
}
}
This allows two child tasks entering a critical region at once, and suspends the rest of them, until slots free up again.
Ordering
The tasks will be executed in the same order as withControl
is called. If one task is done, it will resume the next suspended task. Something to keep in mind is, that this:
let limiter = AsyncLimiter(limit: 1)
Task {
try await limiter.withControl {
print("1")
}
}
Task {
try await limiter.withControl {
print("2")
}
}
... will still not guarantee to print "1" and "2" because of how Task
scheduling works, only if both of these tasks would share the same isolation or executor.
Cancellation
If the operation inside the withControl
closure is already running, the cancellation behavior of your code inside that closure will be used. However, if you'd cancel a Task
where the operation is currently suspended, as in, there was no free slot, CancellationError
will be thrown. This is the reason why withControl
currently cannot fully embrace typed throws.
Priority
It can happen, that you add some operation to AsyncLimiter
with a Task
which has a low priority but later you want to have a Task
with high priority that also adds some work to the same AsyncLimiter
. Without special care, this would make the high priority task wait for a low priority task, which would result in priority inversion. However, with the help of the newly pitched priority escalation APIs, AsyncLimiter
would (permanently) boost the low priority task to high priority, essentially making it inherit the priority (note, that this will not change the order of suspended tasks).
An example of how this would work:
let limiter = AsyncLimiter(limit: 1)
Task(priority: .low) {
// prio: low
try await limiter.withControl {
// prio: low
try await Task.sleep(for: .seconds(5))
// prio: high
}
}
// Somewhen later
Task(priority: .high) {
try await limiter.withControl {
// do high prio stuff
}
}
Final thoughts
This API could lead to deadlocks. Namely, when you do this:
let limiter = AsyncLimiter(limit: 1)
try await limiter.withControl {
try await limiter.withControl {
// ...
}
}
Not sure if there is something, that could be done to prevent this, other than documenting it. This behavior is similar to Mutex
, though. Maybe an overload to withControl
could be added like withCheckedControl
, that detects the recursive use, but I did not think a lot about this yet...
Because of how I implemented the task escalation behavior withControl
is currently O(n)
where n
is the count the currently suspended and active operations. Maybe something can be done to improve this to O(limit)
, but I did not think much about this yet either.
Also, because of how I implemented the task escalation behavior, UnsafeCurrentTask
s are stored outside of the closure of withUnsafeCurrentTask
. In the documentation it is recommended against doing so, but I was not sure if this applies to my implementation as well because the UnsafeCurrentTask
is removed after the operation has completed, so it is not possible that the UnsafeCurrentTask
lives longer than its Task
. Maybe someone can comment if this is actually safe to do, or if another solution has to be implemented.
Finally, here is the package: GitHub - ph1ps/swift-concurrency-limiter. To try it out you will need the latest nightly (I tested it with the nightly of 14th of March).
Thank you for reading. Looking forward to your feedback.