Using `async` functions from synchronous functions (and breaking all the rules)

Hi all,

At the moment in Vapor we're undertaking a big piece of work to further adopt Swift Concurrency. The reasons are outlined in the post but the TL;DR is that we're making it easier for us to adopt to Sendable and making it safer for those using async/await by migrating some of the internals to actors etc.

This presents us with a problem however as we can't break our public API. We're deprecating a lot of the older APIs but many things that are now synchronous will be now asynchronous (because of actor access) so to avoid us duplicating all of Vapor's internals, if something is now an actor, we'd like to be able to call these functions from the older synchronous functions. I know we're not supposed to but is there a way that we can?

Up to now we've been using semaphores to enable us to call an async function from a synchronous context, e.g.:

public func asyncShutdown() async {
   // Code that shuts down something, calls a mixture of synchronous and asynchronous stuff
}

@available(*, deprecated, message: "Migrate to `asyncShutdown()`")
public func shutdown() {
    let semaphore = DispatchSemaphore(value: 0)
    Task {
        await self.asyncShutdown()
        semaphore.signal()
    }
    semaphore.wait()
}

Now I know we shouldn't do this but it allows us to forcefully move users over with warnings without breaking our public API and their builds. If there are better ways then I'm all ears!

The problem comes when we need to use something that's returned from the async function. E.g.

let semaphore = DispatchSemaphore(value: 0)
Task {
    ??? = await self.getSomethingFromStorage()
    semaphore.signal()
}
semaphore.wait()

Because the underlying storage is now an actor, it has to be an async call. And we can't assign a variable outside, e.g.

var renderer: ViewRenderer?
let semaphore = DispatchSemaphore(value: 0)
Task {
    guard let makeRenderer = await self.views.asyncStorage.makeRenderer else {
        fatalError("No renderer configured. Configure with app.views.use(...)")
    }
    // Error - Mutation of captured var 'renderer' in concurrently-executing code
    renderer = makeRenderer(self)
    semaphore.signal()
}
semaphore.wait()

This righty fails because this isn't a safe assignment. So is there a way we can achieve this, even if very hacky!

Thanks!

1 Like

Well, for starters, you can make a single generic function to do this. And then, yeah, within that function you can use unsafe features to escape the isolation box and move the value out of your task.

Are there any examples/docs for doing this? I've had a dig around the docs and various Swift proposals but couldn't see anything that would fit. Thanks!

You can use a class to box the returned value. Something like this works (with all the safety caveats mentioned):

fileprivate class Box<ResultType> {
	var result: Result<ResultType, Error>? = nil
}

/// Unsafely awaits an async function from a synchronous context.
@available(*, deprecated, message: "Migrate to structured concurrency")
func _unsafeWait<ResultType>(_ f: @escaping () async throws -> ResultType) throws -> ResultType {
	let box = Box<ResultType>()
	let sema = DispatchSemaphore(value: 0)
	Task {
		do {
			let val = try await f()
			box.result = .success(val)
		} catch {
			box.result = .failure(error)
		}
		sema.signal()
	}
	sema.wait()
	return try box.result!.get()
}

(You can also make a non-throwing version if needed. I don't think there's a way to make rethrows work in this way).

Then can call:

// in a sync context
let data = try _unsafeWait {
	let (data, _) = try await URLSession.shared.data(from: url)
	return data
}
4 Likes

With all the usual caveats around it being very very bad to do, yeah a DispatchSemaphore wrapped piece like this should do the job.

It's pretty bad to do this in a server since you quickly can end up starving the global pool since handling many concurrent incoming requests... So I think we're really in need to get custom executors so we can put those blocking/horrible hacks operations onto a dedicated "do bad things here" executor, so we don't risk starving everything else in the server. At least the actual IO and timeouts are still going to be off on the side on NIO threads, so it's not all gloom and doom, but still quite problematic.

I'd probably suggest trying to chip away at some of the warnings this would cause, e.g. making the ResultType: Sendable and the fileprivate final class _UncheckedBox: @unchecked Sendable to get a few warnings less, and keep it to this one _blockingWait function. It's not "unsafe" in the memory sense, so perhaps _blocking might be a better prefix for such nasty methods.

10 Likes

Huh, don't know why I didn't try a class to wrap it in - thanks!

Is there a better thing that's available to use?

Not really…

For something better to exist you’d want to isolate the “blocking” on a different pool than the global one but we can’t do that with async/await until custom executors are a thing.

That’s a common problem and solution e.g. in akka where apps in real world would very often end up calling into synchronous db drivers etc. you might like this post I did years ago which visualizes the blocking on the “global pool” vs on the side. scala - Akka HTTP: Blocking in a future blocks the server - Stack Overflow (There's some thread usage screenshots which explain the problem very well in there, so I keep using it as an example, even many years after hah) We don’t have it as bad in Vapor since the event loop handling the server actually is outside of the swift concurrency world, so at least you won’t starve the entire server.

I wonder if we could hack together something the existing but not official APIs for custom executors right now… wdyt @John_McCall, worth trying to look into it or not quite yet?

1 Like

If it is not possible to remove deprecated synchronous apis, then is that not the sign that Vapor internals can not switch to Swift concurrency yet?

It is still possible to expose public async apis by wrapping internals with continuations. Using Swift concurrency in Vapor internals would then have to wait for the next major version where synchronous apis can be removed for good.

If you want to "force" early async, take care a semaphore will deadlock if an inner async async job must run on the blocked thread. For example, blocking on the main thread would deadlock on the first @MainActor job.

Maybe it's possible to look for inspiration from RunLoop or the implementation of XCTestCase.wait(for:timeout:)?

3 Likes

Unfortunately we're stuck between a rock and a hard place on this. We could remove the deprecated APIs but then we would need another major release in the not too distant future with Swift 6 and NIO 3, which is too disruptive for the community. And we can't keep using the old internals because it's not safe - we're already seeing issues caused with async/await because many of the assumptions of Vapor are that everything is run on the same thread which isn't the case anymore, so we need to use actors etc to make that safe. We also want to use async/await as much as possible in the whole stack otherwise adopting distributed tracing is not going to be possible.

So it's entirely a problem that we could solve but it would be more disruptive in the long term for the ecosystem to do the 'right' thing now.

To be clear - you'll get deprecation warnings when you update to whatever with the new release is and if you fix the warnings there will be no use on semaphores in any of the code you touch - it's purely there to stop us breaking the API for those who can't update their code yet but we are going to strongly suggest you migrate before deploying

How about not using actors? e.g. serialising to a private queue:

class SharedData {
  private var value = 0
  private var queue = DispatchQueue(label: "SharedData_Private")

  func setValue(_ newValue: Int) {
    queue.sync { self.value = newValue }
  }

  func setValue(_ newValue: Int) async {
    await withCheckedContinuation { (continuation: CheckedContinuation<Void, Never>) in
      queue.async {
        self.value = newValue
        continuation.resume()
      }
    }
  }
}

This pattern would ensure that synchronous calls are serialised on a new, private thread (not from the global pool), while async callers who are on pooled threads can yield those execution resources while they wait (rather than block).

It's a bit ugly - it's basically implementing your own actor, but it at least gives you the control you need to ship an async API without a new major version and without resorting to private interfaces. Eventually you could replace it with the language's provided actors.

4 Likes

As always, "custom executors" means a lot of different things to different people, so it's important to spell out what you specifically want. Here I think you're saying that, if you have a synchronous blocking function that you want to expose as async, you'd like to be able to force it to execute on a different thread pool (which may or may not be width-limited) in order to avoid creating thread-exhaustion problems in the standard width-limited global pool. Presumably that thread pool would not be acting semantically like an actor in any way, though.

2 Likes

We use the class wrapper trick in swift-nio too. You just need to add @unchecked Sendable to the type declaration to really disable sendable checking.

1 Like

Apologies for the late reply here, @John_McCall!

Wanted to prepared a detailed writeup but then got pulled into some work stuff... :sweat_smile:

Right, worth being more specific here. You got the use-case right: "run specific work off the default pool, and instead on some other specified pool" is the gist of it.

While perhaps worth a separate thread (maybe we can split this out if worth discussing right now? or wait out until we're ready to deep dive into those). I wanted to write up some of what would help addressing this problem though, even if just as food-for-thought:


1. First, one approach here is to make specific actor run on specific thread pool, by overriding the unownedExecutor.

// Now, I'll admit that this is a bit unusual (to me at least, my brain still very used to how akka does this) in Swift actor's current shape where the actor gets the serial execution semantics from the executor: the executor it has must be a SerialExecutor and that's where we get the serial actor execution semantics from.
I'm a bit more used to treating the "executor" (or "dispatcher") as a "dumb" thread pool and the actor's implementation making sure it only submits "run X now" to the dumb pool/dispatcher/executor which just runs things that are given to it. But I'll do my best to stick to Swift wording for consistency!

One approach that helps here is to allow specific actors to declare where they should run all their tasks. This is not unlike Swift's actors and the main actor which happens to use the main thread.

// let's say, in my system I want to dedicate 4 threads (made up number)
// to blocking tasks; and I'll want to make sure all blocking work is done
// on those, rather than on the global pool:
let dedicatedToBlockingTasksPoo = ThreadPoolExecutor(threads: 4)
// This is NOT a SerialExecutor, it is an Executor though:
//     public protocol Executor: AnyObject, Sendable {
//         func enqueue(_ job: UnownedJob)
//     }

Next, I can have any number of actors which I know will be doing blocking, and I make them all use this pool:

actor Blocker { 
  let resource: Resource
  
  let unownedExecutor: UnownedSerialExecutor
  // overrides: 
  // nonisolated var unownedExecutor: UnownedSerialExecutor { get }

  init(r: Resource, executor: ThreadPoolExecutor) { 
    self.resource = r 
    self.unownedExecutor = executor.unownedSerialExecutor // "give me an actor (Serial) executor onto this thread pool"
  }

  // async, but we'll do this work on the blocking executor, good.
  func blockingWorkOnResource() async { <do blocking (e.g. IO) on Resource> }
}

So this is nice, because all methods on this actor would hop to the dedicated pool, and it can do whatever kind of blocking on the resource it needs to do, without blocking the shared global width-limited pool.

This is exactly "the akka way"; where we'd tell people to dedicate some threads for their IO, and run their e.g. database calls on such. In Scala/Akka, the "dispatcher" can be used for either Futures, or actors, since it's just a thread pool, the pattern looks like this:

// config file
my-blocking-dispatcher {
  type = Dispatcher
  executor = "thread-pool-executor"
  thread-pool-executor {
    fixed-pool-size = 16
  }
  throughput = 1
}

Which is then given to actors:

// scala / akka, just to examplify the pattern:
context.spawn(<my actor>, "name", .fromConfig("my-blocking-dispatcher"))

// this is equivalent to in Swift:
<MyActor>("name", <the dispatcher>)

But also they can be passed to Futures (so equivalents of our Task {}):

// scala / akka, just to examplify the pattern:
implicit val blockingDispatcher = system.dispatchers.lookup("my-blocking-dispatcher")

Future { Thread.sleep(10000) } // uses blockingDispatcher implicitly
// equivalent to:
// Future({ Thread.sleep(10000) }) (blockingDispatcher)
// https://www.scala-lang.org/api/2.13.x/scala/concurrent/Future.html

Which brings us to the second part, running specific tasks on specific pools:

2. Sometimes it is useful to run a specific task on a specific pool / executor, without having to go all the way to make an actor for it.

Though perhaps we could say we don't do that in Swift, and instead resort to global actors, and one would declare a global MyIOActor, give it the executor like we did in 1. and call it a day? Then we could:

Task { @MyIOActor in blocking() }

which could be nice as well...?

Otherwise, an approach would be to pass an executor to a Task:

Task(runOn: myBlockingExecutor) { ... }

The global actor approach may be worth exploring though...

3. And since we want to push developers to use child tasks whenever possible, they may also need this ability.

In TaskGroups the API can be rather easily extended to provide "where to run the child tasks":

TaskGroup(of: Int.self, childTaskExecutor: myBlockingExecutor) { group in 
  group.addTask { /*uses myBlockingExecutor*/ }

  group.addTask(on: otherExecutor) { /*uses otherExecutor*/ }
}

So we could have this way to kick off some blocking tasks in a structured fashion.

It gets more complicated with async let, where we probably would need to resort to some scope pattern?

Executors.startChildTasks(on: blockingExecutor) {  
  async let x = { io() on blockingExecutor }
}

// or we'd have to invent some other way to annotate, maybe ideas like
async let x = { @IOActor in ... } // could be a thing...?

could be one way to achieve this... though we'd likely have to carry this executor preference in a task-local, (or make other space for it).


So overall, it all boils down to having to call some blocking code, in various situations, and making sure this code won't execute on the global pool.

Visually, (taken from an ancient writeup I did over here) we never want to have any blocking work on the "default" pool: (cyan=blocking/sleeping, green=running, orange = waiting), like the following bad situation visualizes:


(default pool is completely saturated by sleeps/blocking)

but instead, we'd want to have a world where all the bad things still are happening, but on their own executor/pool separated from the global pool:


(default pool is not busy at all, ready to serve requests, but blocking pool is busy sleeping blocking). This is better since it allows the server to remain responsive and reply with 500 or timeouts or do whatever else it needs to be doing...

Not sure how helpful the writeup is, but I figured I could collect a bunch of ideas and requests regarding this to give the discussion some more concretes -- though also happy to delay diving deeper until we're ready to discuss custom executors more etc.

10 Likes

Even if this was possible, this could only address the OP's need (blocking a sync function until an async one has completed) provided what you call the "specific work" never awaits on jobs that happen to run on the thread that is blocked by the waiting OP's synchronous function.

I don't see how this can be guaranteed, how it is possible to avoid deadlocks.

Even if one "derails" some tasks with alternative executors, those tasks may await on other async functions that must run their jobs on the default pool, or a specific thread, or use continuations in order to wrap god knows what.

The main thread is the first example that comes to my mind. If I run, from the main thread, a sync function that blocks until a Task ends, then any @MainActor or DispatchQueue.main that happens somewhere during the Task's execution will deadlock.

One could imagine a technique to avoid dealocks: only allow sync functions to wait for an async one when they are started from special threads: threads that are guaranteed to never be used by async functions.

But such functions could not be called from async functions. Such functions could not be called from the main thread either. This would just ruin the OP's intent, which is to define public sync apis that wrap async ones (with the ability to freely call them).

Am I right saying that deadlocks are lurking everywhere in this whole forum thread?

@MainAction func f() {
    // It will be very difficult for the Vapor team to guarantee
    // that their async implementation of `doStuff` NEVER
    // happens to schedule something on the main thread,
    // directly or indirectly through third-party or system libraries.
    let x = Vapor.syncDoStuff() // block and cross fingers
}

The API I've been mulling for non-actor custom executors is just to provide some sort of async API that switches to the executor, like:

extension Executor {
  func run<T>(operation: () async throws -> T) async rethrows -> T
}

The target executor would then be used in place of the default global executor for the current task for the duration of the scope. That is, whenever control entered a non-isolated async function, i.e. the situation where Swift would normally switch to the global executor, it would instead switch to the target executor.

I think the right behavior is for this to also be picked up by structured subtasks and Task {}s created within the scope, but of course not by Task.detached {}.

I'm not sure what should happen if you call this on a serial executor, or if that should just be a precondition failure. Also the name run is possibly poorly-chosen.

2 Likes

I doit see the interest in letting the OP, and other readers of their thread, in a state of expectation.

And should I dare : not answering reasonable doubts (sorry for taking the time to write them down above) is also a strategy I can not understand.

This was the only reasonable (from which one can reason) that was provided so far, and no single people in charge of Swift concurrency has provided any answer to this.THIS is also something I can not understand.

The Vapor team is in dire straits, this is something I can understand. But hand-waving answers is not a correct answer to maintainers of such an important library.

1 Like

I've read the thread carefully but I can't see it: can you clarify what question is being ignored or hand-waved away?

So far as I can see the original Vapor ask was answered up-thread.

1 Like

I guess I have been awkward in my wording, sorry for that. It looks to me that the answer to the initial Vapor ask was "sure, you can do it, with unsafe features". But considering the current uncertainty of the Vapor team regarding their thread-safety, I would expect the initial optimistic answer to be tampered. I'm wary that @0xTim thinks that blocking a thread until an async function completes is a good idea - when all I can see are opportunities for deadlocks, and just more uncertainty on top of the current one.