Blocking I/O and concurrency

Actually, you don't even need to spin up a thread to wait for completions – using io_uring_register_eventfd() alongside dispatch_source_create() allows integration with libdispatch, i.e. you can register a handler to be called whenever completion events are available rather than blocking in io_uring_wait_cqe().

Still got some nasty bugs to fix but it's getting there, and io_uring really is a natural fit for Swift concurrency.

An echo server can be implemented as simply as a loop the iterates on an AsyncSequence of file descriptors returned by accept(), followed by recv() and send(). Nice!

4 Likes

This is amazing and frustrating because uring family of APIs are not available on macOS. :confused:

1 Like

Well I am feeling the self-satisfaction you promised :joy:

2 Likes

Glad you tried it out and seems to like it ;-)

Here's an example usage for a TCP echo server. The IORing library presents a file descriptor-based async/await interface to io_uring; IORingUtils has a few helpers for managing socket lifetimes, parsing addresses, etc, but is not part of the core API.

import AsyncExtensions
import Foundation
import Glibc
import IORing
import IORingUtils

public struct IORingTCPEcho {
    private let socket: Socket
    private let ring: IORing
    private let bufferSize: Int

    public static func main() async throws {
        guard CommandLine.arguments.count == 2,
              let port = UInt16(CommandLine.arguments[1])
        else {
            print("Usage: \(CommandLine.arguments[0]) [port]")
            exit(1)
        }

        let echo = try IORingTCPEcho(port: port)
        try await echo.run()
    }

    init(port: UInt16, bufferSize: Int = 32, backlog: Int = 128) throws {
        self.bufferSize = bufferSize
        ring = try IORing(depth: backlog)
        socket = try Socket(domain: AF_INET, type: SOCK_STREAM.rawValue, protocol: 0)
        try socket.setNonBlocking()
        try socket.setReuseAddr()
        try socket.setTcpNoDelay()
        try socket.bind(to: sockaddr_in.any(port: port))
        try socket.listen(backlog: backlog)
    }

    func echo(client: Socket) async {
        do {
            repeat {
                let data = try await client.recv(count: bufferSize, ring: ring)
                try await client.send(data, ring: ring)
            } while true
        } catch {
        }
    }
    func run() async throws {
        let clients = try await socket.accept(ring: ring)
        for try await client in clients {
            Task { await echo(client: client) }
        }
    }
}
1 Like

This is amazing and frustrating because uring family of APIs are not available on macOS. :confused:

Well, if it's just the async/await API you're after, FlyingSocks is pretty nice. Or you could wrap dispatch_io_{readwrite} pretty easily.

But yes, it would be nice to have it on macOS. Umesh, where are you? ;)

2 Likes

I haven't got around to trying out SPI yet, which was my original reason for investigating io_uring, but I've replaced FlyingSocks with IORingSwift in my application (for Linux builds), and it seems to be working nicely. The downside of course is lack of portability, my application needs an abstraction layer to work both on macOS and Linux. Of course, SwiftNIO exists, but I think it might be a bit too heavyweight for my embedded use case.

SPI I'll probably want to use fixed buffers (io_uring_prep_write_fixed() and friends) which requires a bit of thinking regarding how to expose this to Swift in a safe manner without making any additional copies (which would defeat the purpose). I need to meditate on this a bit.

1 Like

Getting this working reliably has been challenging! For those playing along at home (and as a note to my future self), I'll document my experience below.

Firstly, I naively assumed that non-actor isolated asynchronous functions invoked by an actor inherited the actor's execution context. (See SE-0338 which explains this is not the case.) I had wrapped io_uring_submit() in a helper class which was thus invoked on a generic executor – this caused a race which manifest as partially initialised submissions being submitted, and completions invoked multiple times.

Another issue to be careful of is the ambient nature of the io_uring_submit() API. Whilst it's probably an inevitable artefact of io_uring being, well, a ring, it requires care when used in reentrant contexts. io_uring_submit() will submit all outstanding entires that have been assigned with io_uring_get_sqe(). This means you cannot have a suspension point between acquiring a SQE and preparing it for submission. (Technically you can have one between preparing it and calling io_uring_submit(), at least if you’re not using linked SQEs.)

In my initial implementation, I used withCheckedContinuation() to bridge io_uring completions to async/await; however, being a suspension point, it was possible for io_uring_submit() to be called re-entrantly before the continuation was associated with the SQE. The solution was to register a callback immediately which would await the completion handler posting the result on an AsyncThrowingChannel (perhaps Swift concurrency wizards can suggest a better data structure for one-shot events? I could use a continuation but I don't want to busy-wait in onCompletion() for its allocation.

After watching this excellent video, I've learned that withCheckedContinuation immediately executes its body and only suspends on return, so the issue I struck out above was probably just another manifestation of the race condition.

final class SingleshotSubmission<T>: Submission<T> {
  weak var group: SubmissionGroup<T>?

  private typealias Continuation = CheckedContinuation<T, Error>
  private var continuation: Continuation!

  // initialiser removed for space

  func submit() async throws -> T {
    try await withCheckedThrowingContinuation { continuation in
      // guaranteed to run immediately
      self.continuation = continuation
      Task { try await ring.submit() }
    }
  }

  override func onCompletion(cqe: io_uring_cqe) async {
    do {
      try continuation.resume(returning: throwingErrno(cqe: cqe, handler))
    } catch {
      continuation.resume(throwing: error)
    }
  }
}

PS. The irony is I probably wouldn't have triggered the race condition were it not for an unrelated bug in my application code that resulted it in sending considerably more packets than intended.

3 Likes