Task's Forward Progress with sync code

Hello,

I am living in a world of joy with actors and my async code, it has made race prone code reliable, I no longer have to debug odd corruption and hangs, and my code is a lot cleaner as a result - but I am facing an ugly challenge.

I have been using an idiom that is frowned upon in Swift (See Swift Concurrency: Behind the Scenes):

let semaphore = DispatchSemaphore (value: 0)
Task {
    await something ()
    semaphore.signal()
}
semaphore.wait ()

Not only it frowned upon, but I have now experienced in my own flesh the deadlock, and I am at a loss as to what to do about it. The talk does not really offer a solution to this problem, and this year I thought I had seen a ray of light in the talk "Visualize and Optimize Swift Concurrency".

Now, I know what you are thinking "Miguel, just go and rewrite that in a different way, make your entire pipeline truly asynchronous". Let me explain why I have not been able to do this.

The above code is intended to fulfill a contract from a C API, where the C code invokes a callback to send some data over a connection. The contract expects the method to execute and return the number of bytes written. This is invoked implicitly via various APIs, as part of an existing actor.

So my implementation ends up looking like this:

// called within my first actor context
func send_callback (buffer: Data) -> Int {
    let semaphore = DispatchSemaphore (value: 0)
    var status: Int
    Task {
        // transport is a separate actor
        status = await transport.send (buffer)
        semaphore.signal()
    }
    semaphore.wait ()
    return status
}

Here transport is my actor that ensures my serial access. This, sadly, deadlocks, and it is invoked deeply into the code of an actor.

The new Xcode at least was nice enough to tell me that there was some priority inversion going on here, where a higher priority task was waiting on a lower priority one. I tried things like using Task (priority: high) and Task.detached, neither one helps me here.

The ray of light from the talk this year on visualizing concurrency, suggested that maybe I could get lucky if I used a DispatchQueue, so I created a DispatchQueue for this purpose, and then added a task inside (because my transport is an actor, and that is the only way I have found to call into it):

func send_callback (buffer: Data) -> Int {
    let semaphore = DispatchSemaphore (value: 0)
    var status: Int
    myQueue.async {
        Task {
            status = await transport.send (buffer)
            semaphore.signal()
        }
    }
    semaphore.wait ()
    return status
}

I am not married to my semaphore, but I can not figure out any other way to wait for the task to complete. Calling Task.value requires an async context, which I do not have.

7 Likes

Why do you prefer a semaphore to DispatchQueue.sync? One is not any safer than another (as easy to get deadlocks) just the code is simpler/shorter without semaphores.

Do you actually use the returned number of bytes sensibly, or just the fact that this number is either equal or unequal to the number of bytes requested?

I would suggest you to... yes, rewrite that code, but you already heard that.

If you can't make the underlying C code that's calling you use a proper asynchronous pattern, then you don't have much choice except to go ahead and block while making sure that you're called on a thread that's not part of a limited thread pool. But we really cannot add anything to Swift concurrency that makes this easier, or else people will start using it ubiquitously.

3 Likes

What is your transport and how is it organised?

Let me show you where I am heading with this line of questioning :face_with_monocle:. Assume TCP for a moment. You write N bytes and protocol returns "no error" or "N bytes written". Does it mean that the bytes has already reached the destination? In fact not, they probably have not even left your computer, just buffered locally, and eventually they will be sent (e.g. once a certain number of bytes is accumulated or once some delay has passed). And if connection error did occur does that mean that all bytes that were previously reported as "sent" (while in fact just "buffered") actually reached the destination? No. Some of them might have, and you don't know how many (unless you dig into internal TCP state or ask the other side).

Nothing stops you doing exactly the same with your transport: the client code sends 10 bytes - you say "yassir, 10 bytes has been sent" (while in fact they were just buffered). In your case perhaps not in some continuous memory buffer but as:

URLSession.shared.dataTask... blah.. blah
    <- the bytes are somewhere here at this point
{
    result in     <- this is not called yet
	...
}

or even:

queue.async
    <- bytes are somewhere here at this point, we are not yet on the queue
{
	<- this is not called yet, so the "sent" is not done yet
	URLSession.shared.dataTask... blah.. blah {
		<- and we are not here yet either
	}
}

(I am deliberately using callback based syntax in this example for clarity.)

20 bytes sent again? "Fine", you say, "20 bytes sent" (while in fact "buffered").

Once the bytes are "actually sent" at that point you'll have the result: for simplicity, let's assume it is either "success" or "error". if "success" - nothing to worry about, proceed as usual. If "error" - record that error into a variable. Next time your API's "send" or "receive" are called - check that error variable and if it is "error" return it back without sending (ie buffering) bytes.

This way you've just solved your blocking dilemma by ... doing no blocking at all! And you solved it by returning speculative optimistic "ok's" and buffering even if you didn't write a single line of code to implement explicit memory buffer. :partying_face::clinking_glasses::champagne:

Having said that, I don't know the specifics of your task and transport (e.g. is your sending back pressure controlled?), etc, so this approach might not work for you.

2 Likes

Hello friends,

When I posted earlier this morning about my travails with using async code inside a sync API I did so in an act of desperation - after various days trying all sorts of unsavory hacks and workarounds - none which I at liberty to describe as they can get me expelled from the industry and the programmer's Union.

After posting that, I came up with a solution, which is remarkably close to what @tera suggested in the second post. Essentially, I was able to rely on this callback being able to return EAGAIN for this to be tried again, and I could use a tiny state machine to sort this out - I am myself now a happy camper and I can move on to greener pastures.

That said, I believe that this situation is unfortunate. I do appreciate the desire to establish great coding practices via this shared thread pool, but I believe that there should be room in Swift to have a way of creating new Tasks that are bound to a fresh thread/dispatch queue, for situations where folks could not get as lucky as I did today.

The prospect I was faced earlier today was to drop my actor code and go back to the old dispatch queues and manual locking - which would have been a step backwards, and I hope that some compromise in this design could help other senior programmers from losing their precious few hairs left.

Best,
Miguel.

7 Likes

@migueldeicaza
Here is a quite interesting research related to your topic

1 Like

Hi Miguel, nice to see you here!

This is what we aim to allow with Task Executors which are now partially implemented on main (wiggling around with the APIs still a little bit), and under review over here: SE-0417: Task Executor Preference

They'll allow you to start a Task on a specific TaskExecutor, and the executor is then preferred instead of the global shared pool. A prime use case for those is isolating some blocking code you cannot rewrite onto a bunch of dedicated threads/queues that you encapsulate with this TaskExecutor and at least then all the bad blocking code is isolated to this pool of threads, rather than hurting the width-limited default one.

Hope this helps!

If you can't really wait for these, you could somewhat achieve a similar result by using a custom actor executor swift-evolution/proposals/0392-custom-actor-executors.md at main ยท apple/swift-evolution ยท GitHub so make a SerialExecutor and dedicate it some queue or thread, and make all your blocking calls from an actor which use this executor. You could of course have many actors use either the same executor or each have its own etc. Not as great as the task executors for this purpose but it'd also help get the blocking away from the shared pool but still be able to use async/await.

6 Likes