Async design challenge

Hello folks,

I am consuming a C library (libssh2) in an iOS app of mine, and while it works great, I realized that my current design will not work well with multiple ssh streams but also that async could help me solve this problem, but I have not found much information on how to solve this.

Let me explain the challenge, and will present a simplified picture.

For the purpose of this discussion, the APIs that I need to use are not thread-safe, so I can not have two threads calling into them in parallel. This is not much of a problem, as I can use a queue for it.

To prevent blocking, invoking certain APIs will return the equivalent of a Unix syscall EAGAIN, which means that the operation should be retried in the future (once more data is available). Before I tried to use multiple streams per connection, this would have been enough:

    func readFile (path: String) -> String {
      var ret = 0
      var buffer = ""
      repeat {
         ret = libssh2_do_reading (path, &buffer);
      } while ret == LIBSSH2_ERROR_EAGAIN
      return buffer
    }

The problem with this approach are two:

  • it consumes the entire thread while waiting for data to be delivered - so no other operations on other channels can process any data, or send any data as the queue I have designated to call into libssh2 is in use
  • It is effectively a busy loop, waiting for data from the network that might or might not arrive. Until data arrives to fulfill the request, that while loop will be killing glorious cycles from my precious iPhone. How can you ask an electron to be the last one to die for a pointless operation?

So I figured that maybe async could solve this problem for me, but I am approaching with my .NET baggage, and can’t seem to find my way around Swift’s async.

What I would like to do is to keep the async function suspended until I know that there is new data available, to avoid the busy loop, but to add insult to injury, I would want this to resume execution in its designated queue.

I figured I could write the above like this:

    func readFile (path: String) async -> String {
      var ret = 0
      var buffer = ""
      repeat {
         ret = libssh2_do_reading (path, &buffer)
         if ret == LIBSSH2_ERROR_EAGAIN {
            Task.yield ()
         }
      } while ret == LIBSSH2_ERROR_EAGAIN
      return buffer
    }

This I imagine is a mild improvement, but the documentation for Task.yield is merely that it will let other folks get in on the fun. What I would ideally want is something similar to .NET, where the function returns a Task that can be signaled externally for execution to be resumed.

This a spiritual version of what I would like to accomplish would be something like this:

    var pingable_tasks [Task] = []
    
    func readFile (path: String) async -> Task<String> {
      var task: Task
      task = Task {
        var ret = 0
        repeat {
           ret = libssh2_do_reading (path)
           if ret == LIBSSH2_ERROR_EAGAIN {
              Task.suspend ()
           }
         } while ret == LIBSSH2_ERROR_EAGAIN
         pingable_tasks.remove (task)
         return buffer;
       }
       pingable_tasks.append (task)
       return task
    }
    
    // Elsewhere, when we have received data from the network, I can ping all 
    // tasks to let them know they should try to get some data
    func dataReceived () {
        for task in pingable_tasks {
           task.resume ()
        }
    }

Now, the above is incorrect, as I used Task { to launch a new task, with no respect for the sanctity of calling libssh2 from a single thread at a time. I guess if worse comes to worst, I could wrap the entire libssh2 API in an actor, but that does feel a bit over the top. What I would love is to control for the Task to always be scheduled on a specific queue.

To me, that suggests you want to use withUnsafeContinuation. That'll give you the continuation for the task, which you can then feed a value externally when your outer event loop sees there's more data to be read.

On the subject of making the task stay on a specific queue, what are you trying to achieve specifically by that? Is it that only one task can be calling into libssh2 at any point in time? Using a global actor might be the best way to model that. If libssh2 can only be used from a specific thread, then @MainActor-constraining it might be the way to go, given the tools we have currently.

I, too, would love a way to suspend a task manually and resume it upon receiving some signal. Having implemented a busy-loop (with Task.yield) in just the past couple days, I can tell you that your instincts are correct: it is bad for CPU, and thus, battery.

Could you explain a bit more? I have the following code:

func waitForQOS() async throws {
    while self.qos?.isSendingAdvised() == false {
        try Task.checkCancellation()
        await Task.yield()
    }
}

This is called from within a detached Task which in turn loops over a queue.

How could I restructure this to avoid the busy-loop?

I'm taking for granted that self.qos has some sort of traditional callback-based mechanism for signalling when sending becomes advised. If that is the case, then you can do something like:

extension QOS {
  func waitUntilSendingIsAdvised() async {
    let _: Void = await withUnsafeContinuation { cc in
      self.sendingAdvisedCompletion = {
        // Clear the callback so that continuation doesn't get resumed twice
        self.sendingAdvisedCompletion = {}
        // Resume the task
        cc.resume(returning: ())
      }
    }
  }
}

func waitForQOS() async throws {
  await self.qos?.waitUntilSendingIsAdvised()
  ...
}

Thank you! I'll try that and see how it goes. I've been following the Swift Concurrency proposals and threads, but I've only been getting my hands dirty for the past week or so.

1 Like

Hello,

Thanks for the pointer Joe, the name of the function gave me a couple of pointers, and some searches in GitHub helped me put together this hack-ish version of what I need (this sidesteps the issue of the single queue).

Posting both as a solution for future folks, and hoping someone can tell me if this is too gross, or if I made a mistake:

import Foundation

// Data produced
var data: [Int] = []
var lock: NSLock = NSLock ()

// List of pending continuations that will be invoked when new data is produced
var conts: [()->Bool] = []

func broadcast () {
    lock.lock()
    let copy = conts
    conts = []
    lock.unlock()
    
    //print ("broadcasting to \(copy.count)")
    for c in copy {
        let remove = c ()
        if !remove {
            conts.append(c)
        }
    }
}

var state = 1

func produce () {
    while true {
        lock.lock ()
        data.append (state)
        state += 1
        print ("Have \(data.count)")
        lock.unlock ()
        broadcast ()
        sleep (2)
    }
}
Task { produce () }

// Simulates an API that returns -1 to indicate "try again"
func mread () -> Int {
    lock.lock ()
    iterations += 1
    if data.count > 0 {
        let v = data.removeFirst()
        lock.unlock()
        return v
    }
    lock.unlock()
    return -1
}

// Just to keep track of how many times mread is called
var iterations = 0

// Busy loop version of doRead
func doRead () async -> Int {
    while true {
        let v = mread ()
        if v != -1 {
            return v
        }
 
    }
}

// async version with withUnsafeContinuation
func doRead2 (idx: Int) async -> Int {
    return await withUnsafeContinuation { c in
        // The return value indicates if the operation produced a value
        let op: ()->Bool = {
            let v = mread ()
            if v == -1 {
                return false
            }
            c.resume (returning: v)
            return true
        }
        if !op() {
            lock.lock()
            conts.append (op)
            lock.unlock()
        }
    }
}

Task {
    for x in 0..<4 {
        //let v = await doRead ()
        let v = await doRead2 (idx: x)
        print ("At \(x) got \(v)")
    }
    print ("Total iterations: \(iterations)")
}
sleep (10)

As for the queue: libssh2 is not thread-safe, so only one caller can be invoking methods at a given point [1]. I wanted to avoid actors, because they do not allow subclasses, and right now my design had a base class "Session" that wrapped the basics, and a "SocketSession" that uses sockets as its transport, and hopefully soon a proxied version for restoring connections in the future that will be layered on top of an existing one. I can certainly redo this design if this is the only way of ensuring only one thread is calling into libssh2 at a given time.

Best,
Miguel.

[1] Actually, that is an over simplification, libssh2 allows some parallelism, but for independent sessions. In this scenario, everything is on the same session, but using multiple channels.

1 Like

If you don't like the idea of modeling every SSH session as its own actor, you could also try a model where an actor represents the session, and individual channels are modeled using a class hierarchy. With concurrency type-checking enabled (-warn-concurrency on recent builds), the compiler should prevent non-Sendable class instances from being reachable by more than one actor, so you can be confident that objects are only being used from a single actor. Taking locks from async code for long durations of time is best avoided, because that will block a thread in the pool, so if you can use actors for mutual exclusion, that will scale better in the Swift async model.

3 Likes

Isn't it possible for the producer to broadcast after a read but before you can add the next continuation? Then you would miss that broadcast.

One other thing: broadcast() is actually the one doing the read, by calling c() here, which is the op created in doRead2(), and op is the one that ends up calling mread(). So your task is actually only creating a block and enqueuing it, but not doing the actual read. Or am I missing something here?

I call the op in doRead2, just after creating it, to determine whether to add to the conts array or not. I did that to avoid code duplication - would love to find a way that avoids creating that lambda function, but also that doesn’t accidentally lead to two code paths with subtle errors/differences over time.

oh yeah I did miss that op() call. But you're still trying to read from your producer code as well. Rather, wouldn't you want to only call the continuation and let the task do the reading?

You mean my call to broadcast() from the producer? That implementation is purely to showcase the "New data is available, and one of the consumers might want to read it", which just happens to be modeled after what I have to do in the libssh2 code. In that model, essentially, data is received from the network, that is demultiplexed to various clients. Sadly, there is no way of directly resuming one task, instead libssh2 today relies on everyone polling for result (that said, there is an issue open on libssh2 to address this problem, but I am modeling what I have).

I mean: produce() -> broadcast() -> c() -> mread(). I'm still confused about whether this is what you want, calling mread() from the producer. I would have expected you only want to call mread() from the consumer tasks.

This doesn't technically answer your question but it would allow you to side-step it.

If you implemented the SSH library using swift-nio-ssh then you'd have a much easier time :slight_smile:. And you'd gain support for Network.framework out of the box if you also support swift-nio-transport-services. Having the connections be opened by Network.framework will make everything else more robust and you'll properly support VPNs etc.

Hello,

The broadcast() in this case is a "Hey everyone, more data is available, check if you can resume execution", so it is normal that it would take place there.

I had forgotten about this! When I looked at it, months ago, I believe it explicitly avoided some features - which were good for security, but bad for connecting to existing servers. Does that ring a bell? Are there some features or servers that might not be supported? I could swear I read that somewhere before, and I can not find it on that repo now.

My usage of libssh2 already uses NWConnection as the transport, so I should in theory get those benefits.

Thanks for pointing to it, I will make a branch and explore this approach for my app.

It does, I think you're referring to RSA crypto? Not sure what the exact state is there today, ping @lukasa. SwiftNIO SSH uses Swift Crypto internally and that only gives you access to modern, harder-to-use-wrong crypto.