How do delay the response to an async method?

I have an actor with a method like the following:

func downloadRequested(_ request: DownloadRequest) async throws -> Response {
    let download = try await self.download(for: request)
        
    self.transferIdMap[download.transferId] = download.itemId
        
    let size = try Int64(download.dataProvider.size())
        
    return (download.transferId, size, download.originalFilename, download.dataProvider.mimeType)
}

The problem is that incoming requests need to be rate-limited. I'd like to delay execution of the body of this method when there are too many outstanding requests, and return a future of some kind to the caller, so it can maintain its async/await flow.

Is there a way to do this using structured concurrency?

1 Like

You can use Task.sleep() to suspend the task for (at least) a certain amount of time.

I could busy loop with yield and/or sleep, but that doesn't seem like that should be the only available solution.

How do you know when you have waited long enough for the rate limiting? Maybe a continuation is what you need.

There is a processing loop involved that can dequeue the next waiting request when a download completes. I tried a continuation, but the method body needs to be async, and it seems you can't use continuations with async code.

The body that you want to pass to withCheckedContinuation is an async function? I wonder if you can break it down further to get around that. I think we're at the point where seeing some of your actual code would be helpful.

The code in the OP is the actual code, with unnecessary bits (logging, mostly) stripped out.

Here's the caller: it's invoked by a protobuf service.

func createDownload(request: TransferSignaling_CreateDownloadRequest,
                    done: WRpcCallback<TransferSignaling_CreateDownloadResponse>)
{
    Task {
        do {
            guard let response = try await self.downloadCoordinator?.downloadRequested((request.itemID,
                                                                                        request.thumbnail,
                                                                                        request.mediaType,
                                                                                        request.requestType))
            else {
                return
            }
                
            done.run(response: TransferSignaling_CreateDownloadResponse.with {
                $0.transactionID = response.0
                $0.size = response.1
                $0.fileName = response.2
                $0.mimeType = response.3
            })
                
            await self.downloadCoordinator?.responseSent(transferId: response.0)
        }
        catch {
            DLog.log("downloadRequested failed: \(error)")

            done.fail(reason: "file not found")
        }
    }
}

You can certainly just have an async function that returns a future. But what do you expect callers to do with this future? Would it be better to offer the caller an opportunity to provide a callback to handle exceptional but non-fatal conditions, and then the API itself only returns/throws when the whole operation is complete?

I ended up going a different route for my requirement, but I am interested in learning more about Swift Concurrency. I think I understand how to return a future: that's just returning a Task, which can be await'ed at the caller's convenience, correct?

What I am missing is a way to create a Task, or task-like thing, that doesn't execute right away. That would have solved my problem as I originally conceived it. Does such a mechanism exist?

1 Like

You could create an async closure and pass that, which can later be run in a Task. I suppose you can extend Combine’s Future type to accept such a closure which runs inside a Task. For example:

extension Future where Failure == Error {
  convenience init(_ closure: @escaping () async throws -> Output) {
      self.init { promise in
          Task {
              do {
                  let value = try await closure()
                  promise(.success(value))
              } catch {
                  promise(.failure(error))
              }
          }
      }
  }
}

Now you can wrap your work in a closure, create a Future using it and pass that. Then later its value can be awaited.

let future = Future(asyncClosure)
// Pass the future to another function or whatever
let value = try await future.value
1 Like

I’m not sure I understand your goal very well, but if you just want to process requests at some specific rate, then you could create a downloader async sequence and zip that to a timer. I haven’t really thought this through; I’m just throwing that out there as a possibility. From my understanding, if no download requests are available, the downloader would await on producing its next element and nothing would happen. As for the timer, it would produce values at a fixed rate, hence capping off how many requests can be processed and thus zipped at a given time. The underlying concurrency mechanism is Task.select (essentially a task group that returns its first result to complete).

2 Likes

The requirement is to not process the incoming request at all while X (currently defined as 5 in the app) are currently being processed. Without concurrency, I'd use an OperationQueue here.

You can accomplish this with a TaskGroup and manually enqueuing child tasks. I have an example here that's talking about working around a performance issue, but would also do what you want here.

I think a simple AsyncQueue actor that manages an array of Operations (@Sendable () async -> Void) could be written that accomplishes what you need. I think this is similar to what OperationQueue offers, but utilizes Swift concurrency:

actor AsyncQueue {
    static let shared = AsyncQueue()
    
    let maxConcurrentOperationCount: UInt?
    let taskPriority: TaskPriority?
    
    init(taskPriority: TaskPriority? = nil, maxConcurrentOperationCount: UInt? = nil) {
        self.maxConcurrentOperationCount = maxConcurrentOperationCount
        self.taskPriority = taskPriority
    }
    
    private var inProgress: [Task<Void, Never>] = []
    private var backlog: [@Sendable () async -> Void] = []
    
    func submit(_ operation: @escaping @Sendable () async -> Void) {
        if let maxConcurrentOperationCount = maxConcurrentOperationCount {
            if inProgress.count < maxConcurrentOperationCount {
                start(operation)
            } else {
                backlog.append(operation)
            }
        } else {
            start(operation)
        }
    }
    
    private func start(_ operation: @escaping @Sendable () async -> Void) {
        let task = Task(priority: taskPriority, operation: operation)
        inProgress.append(task)
        Task {
            await task.value
            inProgress.remove(at: inProgress.firstIndex(of: task)!)
            checkCurrentOperationCount()
        }
    }
    
    private func checkCurrentOperationCount() {
        guard let maxConcurrentOperationCount = maxConcurrentOperationCount else { return }
        while inProgress.count < maxConcurrentOperationCount && !backlog.isEmpty {
            start(backlog.removeFirst())
        }
    }
    
    func cancelAll() {
        backlog.removeAll()
        inProgress.forEach { $0.cancel() }
        inProgress.removeAll()
    }
    
    func waitAll() async {
        while !inProgress.isEmpty || !backlog.isEmpty {
            await Task.yield()
        }
    }
}
1 Like