Bridging legacy DispatchQueue code to async/await API design question

Hi guys,

I have watched all swift concurrency related WWDC videos, played with a lot of code, and upgraded many of my legacy packages to use async/await successfully.

There is one missing piece that I am struggling with, and I am looking for a "best practice" kind of answer.

Imaging an API like Network.framework or AVCaptureSession. Or any other component that does some background work on its workerQueue (also used for internal synchronisation) and that requires that API consumers to specify a callbackQueue where to invoke delegate methods.

Typically the workerQueue is created inside the class.

The callbackQueue needs to be specified externally.

The AVCaptureSession does this for example via setSampleBufferDelegate(_:queue:) | Apple Developer Documentation.

The Network.framework does this for example via start(queue:) | Apple Developer Documentation.

Now I have been following this pattern myself for years and it works great.

Imagine now that you want to create an async/await wrapper around these existing classes. You will use the nice withCheckedThrowingContinuation to invoke a call and resume the continuation on the delegate callback.

All working nice.

But there is still the callbackQueue that needs to be specified. But where should that come from?

Looks like it does not matter much for the async variants, so I can just create a serial queue internally, use it for callbacks, and hide it from the async API consumer.

Or should I expose it to the API caller? Looks to me like a newcomer async/await person does not know what to do with this?

Here is an example server code (that I am not sure if it is valid) that accepts incoming connections and receives packets from the connection. Implemented via AsyncStream.

let server = Server(preferredPort: 1234)
let callbackQueue = DispatchQueue(label: "serverCallbackQueue")
try await server.start(queue: callbackQueue)

Task {
    for await x in server.peers {
        // handle incoming peer connection
        // this code will be executing on the callbackQueue 
        Task {
            for await packet in x.packets {
                // process incoming packet data
                // this code will also be executing on the callbackQueue 
            }
            // peer disconnected
        }
    }
}

The imaginary API requires me to specify the callbackQueue although from the pure Swift Concurrency perspective I feel this should not be required.

So the question is:

What is the best practice how to bridge Swift Concurrency world with the existing APIs that require callback dispatch queue?

thanks for any recommendations, clarifications
Martin

Try using main for the callbackQueues.

As a side note...

Looks like we've traded one "pyramid of doom" for another :thinking:

That is obviously not a great idea, as it automagically ties all the code to main queue, and therefore to @MainActor and therefore can cause unnecessary main thread contention.

I would love to have the ability to by default use the callbackQueue that would represent say a currentActor.

But since actors and queues are unrelated, there is not an easy way to achieve this.

Or is it?
Martin

One way you could solve this problem is to turn those DispatchQueue-based Objective-C classes into proper actors. The DispatchSerialQueue class conforms to the SerialExecutor protocol, which you can use as the Actor executor. You can then use the queue for calling into DispatchQueue-based Objective-C APIs and handle their logic in normal actor isolated functions which will be guaranteed to be running on the same DispatchSerialQueue. Optionally, if you find that creating a separate DispatchSerialQueue for every individual service is an overkill, you can turn it into a GlobalActor and use that instead. either way, you'll have Swift's actor isolation functionality at your disposal to help prevent data races while avoiding unnecessary queue hopping.

2 Likes

Just did this yesterday for interaction with such legacy API. Have found two complications with that, despite being great way to address issue: the first is that on Apple platforms this is limited to the latest OS versions, on earlier ones there is a need to actually write custom executor (minor inconvenience); the second is that actor assumption isn’t working correctly and if there are some delegates to be implemented, the code becomes a bit odd.

1 Like

the second is that actor assumption isn’t working correctly and if there some delegates to be implemented, the code becomes a bit odd.

The assume APIs will handle more situations shortly.
This is what [Accepted] SE-0424: Custom isolation checking for `SerialExecutor` was about.

While I can’t promise back deployment, it might be possible, as it seems the payoff would be rather large.

1 Like

Thanks all for the insightful responses. I think I may have found a reasonable answer actually.

Invoking continuation.resume and continuation.yield actually prepares a code block to be later invoked on the cooperative pool, so it does not matter much where the actual callback code is executed as it only hops through the callback queue to the cooperative pool anyway.

So I have enhanced my legacy APIs to allow callback queue be skipped entirely and invoke continuation.resume or continuation.yield straight from the internal worker queue.

That way the legacy code is synchronised and protected by internal worker queue, and all async code is scheduled from that worker queue to be later executed on the cooperative pool.

The more I look into this to more it looks like the easy and correct solution.

1 Like

I did exactly like you. I submitted this technique in the linked thread below. The feedback was quite reassuring.

1 Like

It depends. Consider the following example:

import Foundation

actor A: NSObject, URLSessionDelegate {
    let url: URL
    var session: URLSession!

    init(url: URL) {
        dispatchPrecondition(condition: .onQueue(.main))
        self.url = url
        super.init()
        precondition(OperationQueue.main.underlyingQueue == DispatchQueue.main)
        session = URLSession(configuration: .default, delegate: self, delegateQueue: .main)
    }
    
    func load() {
        dispatchPrecondition(condition: .notOnQueue(.main))
        session.dataTask(with: URLRequest(url: url)) { data, response, error in
            dispatchPrecondition(condition: .onQueue(.main))
            precondition(data != nil && error == nil)
            // self.decode(data!) can't do that
            Task {
                await self.decode(data!)
            }
        }.resume()
    }
    
    func decode(_ data: Data) {
        dispatchPrecondition(condition: .notOnQueue(.main))
        // some lengthy decoding here
        print("done")
    }
}

class C {
    let a: A
    
    init() {
        a = A(url: URL(string: "https://a url of your choice")!)
        Task {
            await a.load()
        }
    }
}

let c = C()
RunLoop.current.run(until: .distantFuture)

(I am using URLSession to make it more self contained. And I am deliberately mixing await / async and callbacks to illustrate the below point).

Note that even if I specified the callback to be called on the main queue (and it is called indeed on the main queue as verified by the preconditions) I can't just proceed and call "self.decode(data!)" there – that's an actor-isolated instance method and it's a warning now and an error in Swift 6 – I have to called it with await in a Task, and when I am in that call I am no longer on the main queue (or another explicitly specified queue), so the actual amount of time spent on the main queue is minute, perhaps even unmeasurable.

If you still care about that:

  1. measure the time loss / or increased CPU load on the main thread. This step is important so if you do any optimisation you'd be able compare the two and figure out if that optimisation is worth doing.

  2. introduce a "main" dispatch queue of your own, say mymain and use that instead in those instances. I'm pretty sure It will be mostly empty most of the time.

And in those instances when API in question allows passing (or defaulted to) nil as the queue (to mean "some default queue") – pass nil.