Implementing queueing

I'm trying to wrap my head around all of the new Swift concurrency stuff and have been playing around with our networking stack to do so. Requests currently are passed to our network client and have a completion handler. The network client looks at how many requests are currently in-flight and will enqueue the newly-submitted request if we're already at the limit.

// Pseudocode
func load(
    request: URLRequest,
    completion: @escaping (Result<(URLResponse, Data), Error>) -> Void
)

Converting the actual underlying loading of the request is fairly straightforward β€” we could use URLSession.data(for:delegate:) or similar async methods and update the function signature to be async and return the Result:

func load(request: URLRequest) async -> Result<(URLResponse, Data), Error>

However, I'm unsure of how to encapsulate the notion of "enqueue if we're already at max # of in-flight requests." Can somebody point me in the right direction? What would I return from the load function in that case?

I found this tweet to be most enlightening (-:

Share and Enjoy

Quinn β€œThe Eskimo!” @ DTS @ Apple

1 Like

Thanks @eskimo - I have a follow up question on that sample code though which is a bit unclear to me:

actor ImageDownloader {

   private enum CacheEntry {
       case inProgress(Task.Handle<Image, Error>)
       case ready(Image)
   }

   private var cache: [URL: CacheEntry] = [:]

   func image(from url: URL) async throws -> Image? {
       if let cached = cache[url] {
           switch cached {
           case .ready(let image):
               return image
           case .inProgress(let handle):
               return try await handle.get()
           }
       }

       let handle = async {
           try await downloadImage(from: url) // the actor doesn't suspend here, but the async task might
       }

       cache[url] = .inProgress(handle)

       do {
           let image = try await handle.get()
           cache[url] = .ready(image)
           return image
       } catch {
           cache[url] = nil
           throw error
       }
   }
}

Specifically, this part:

       case .inProgress(let handle):
               return try await handle.get()

shouldn't this also use do/catch, or alternatively use getResult and check the return value?

How does that work if a task can return its value to multiple waiters, I would expect it to throw for all of them?

First time I read through the code, it'd definitely been helpful to have a comment that the await in the async task will be done in a different context so we won't suspend the actor there (just mentioning in case someone else misses that - its not yet natural/easy reading just due to not being used to the new concurrency model yet).

I had the same reaction. It took me a second to realize the await was okay because it's on another task.

Thanks @eskimo for that link! I think I understand how Task.Handle can be used to e.g. de-duplicate requests.

If I want to put a hard cap on the # of simultaneous downloads that can happen, though, how would that work? What would the implementation of downloadImage(from:) look like? When using completion handlers, you could grab the closure, put it in a simple struct with the request/URL, and store that in a queue.

Edit:

In the example, the task handle is created by using async {}. Does that work not get fired off until somebody awaits it?

I think you are looking for withCheckedContinuation which produces a completion handler which you can store and call later.