[RFC] Asynchronous queueing

A couple of months ago, I posted about how me and my team experienced our journey of fully embracing Swift Concurrency in several projects. I also mentioned some points where we felt like we were too early or said differently, where there was no good solution available in the open-source land or also in the standard library.

I also recognized that some those tools (deadlines, queues, ...) probably do not quite fit with the standard library or related projects and are also quite niche, so I began open-sourcing my implementations.

One of the remaining topics for us was the missing support for queueing asynchronous functions or essentially restricting 1-n amount of tasks executing code in parallel (if anyone is interested in the use cases we had for this, they are described in my other post). There are already several popular packages out there, which try to solve this problem, for instance Queue, Lockor Semaphore which I have been using. While I do not have any noteworthy problems with any of these libraries, there were concerns raised about possible priority inversion.

Recently, there was a proposal to add manual priority escalation APIs (SE-0462) which has been accepted and I thought that with this set of APIs it might now be possible to implement another approach to this problem which I saw here.

I've now ended up with an experimental implementation of this, which seems to be passing the unit tests I wrote for it. Due to the popularity of the other projects and implied demand for this kind of API I wanted to get some comments and feedback early on so together we could end up shaping this API so it fits most the communities use cases, not only mine. I'd be more than happy to receive any feedback, but some examples would be if it would be useful to you, naming, API surface, but also implementation details or maybe even misuses of APIs.

AsyncLimiter

The surface is almost identical to what John McCall came up with in the thread I linked previously:

public final class AsyncLimiter: Sendable {
  public init(limit: Int)
  public func withControl<R, E>(isolation: isolated (any Actor)? = #isolation, operation: () async throws(E) -> sending R) async throws -> R
}

You'd use it like this, kind of an asynchronous mutex, if you will:

let limiter = AsyncLimiter(limit: 1)

Task {
  try await limiter.withControl {
    // do something that should only be run once at a time
    try await Task.sleep(for: .seconds(5))
  }
}

// Somewhere else
Task {
  try await limiter.withControl {
    // this will only ever be executed after the 5 seconds of the previous task have passed
  }
}

But you could also do something like this:

let limiter = AsyncLimiter(limit: 2)

await withThrowingTaskGroup(of: Void.self) { taskGroup in
  for index in 0..<10 {
    taskGroup.addTask {
      try await limiter.withControl {
        try await Task.sleep(for: .seconds(1))
      }
    }
  }
}

This allows two child tasks entering a critical region at once, and suspends the rest of them, until slots free up again.

Ordering

The tasks will be executed in the same order as withControl is called. If one task is done, it will resume the next suspended task. Something to keep in mind is, that this:

let limiter = AsyncLimiter(limit: 1)
Task {
  try await limiter.withControl {
    print("1")
  }
}
Task {
  try await limiter.withControl {
    print("2")
  }
}

... will still not guarantee to print "1" and "2" because of how Task scheduling works, only if both of these tasks would share the same isolation or executor.

Cancellation

If the operation inside the withControl closure is already running, the cancellation behavior of your code inside that closure will be used. However, if you'd cancel a Task where the operation is currently suspended, as in, there was no free slot, CancellationError will be thrown. This is the reason why withControl currently cannot fully embrace typed throws.

Priority

It can happen, that you add some operation to AsyncLimiter with a Task which has a low priority but later you want to have a Task with high priority that also adds some work to the same AsyncLimiter. Without special care, this would make the high priority task wait for a low priority task, which would result in priority inversion. However, with the help of the newly pitched priority escalation APIs, AsyncLimiter would (permanently) boost the low priority task to high priority, essentially making it inherit the priority (note, that this will not change the order of suspended tasks).

An example of how this would work:

let limiter = AsyncLimiter(limit: 1)
Task(priority: .low) {
  // prio: low
  try await limiter.withControl {
    // prio: low
    try await Task.sleep(for: .seconds(5))
    // prio: high
  }
}
// Somewhen later
Task(priority: .high) {
  try await limiter.withControl {
    // do high prio stuff
  }
}

Final thoughts

This API could lead to deadlocks. Namely, when you do this:

let limiter = AsyncLimiter(limit: 1)

try await limiter.withControl {
  try await limiter.withControl {
    // ...
  }
}

Not sure if there is something, that could be done to prevent this, other than documenting it. This behavior is similar to Mutex, though. Maybe an overload to withControl could be added like withCheckedControl, that detects the recursive use, but I did not think a lot about this yet...

Because of how I implemented the task escalation behavior withControl is currently O(n) where n is the count the currently suspended and active operations. Maybe something can be done to improve this to O(limit), but I did not think much about this yet either.

Also, because of how I implemented the task escalation behavior, UnsafeCurrentTasks are stored outside of the closure of withUnsafeCurrentTask. In the documentation it is recommended against doing so, but I was not sure if this applies to my implementation as well because the UnsafeCurrentTask is removed after the operation has completed, so it is not possible that the UnsafeCurrentTask lives longer than its Task. Maybe someone can comment if this is actually safe to do, or if another solution has to be implemented.

Finally, here is the package: GitHub - ph1ps/swift-concurrency-limiter. To try it out you will need the latest nightly (I tested it with the nightly of 14th of March).

Thank you for reading. Looking forward to your feedback.

8 Likes

Nice work, looks pretty neat.

I would still want to solve this on the language level, and I hope we'll get there someday, but in the meantime this isn't the worst take on it :slight_smile:

That is a nice usage, nice to see you found uses for it already.

Yeah although take care though that that's pretty brittle because default actors are not FIFO at all and a high priority task can be processed before a low priority task... So while "the task order" is not changed, if a high and low priority tasks both arrive at a default actor, and the "low one was first" it can still be the case that the high one will be processed before the low priority one. We don't have FIFO guarantees in default actors today, they're allowed to pick a high priority task from the queue before others.

That is the property you need to keep if this is to be safe, if you do that, you're fine, yes.

2 Likes

Good to know, let me know if I can be of any help.

Right, in my unit tests, I had to implement a custom executor to get them running deterministically. Although this is not what I mean by the paragraph you quoted. I meant that the "boosting" mechanisms implementation in AsyncLimiter does not insert high prio suspensions before low prio suspensions, but rather keeps the order of entering the lock the same. It will just escalate the low prio tasks.

Yeah, I did want to clarify so people don't get the impression this achieves FIFO when actors are involved. Yeah a custom actor executor effectively disables the queue reordering and restores FIFOness, that's one way to achieve it today, that's right.

1 Like