Actually, you don't even need to spin up a thread to wait for completions β using io_uring_register_eventfd() alongside dispatch_source_create() allows integration with libdispatch, i.e. you can register a handler to be called whenever completion events are available rather than blocking in io_uring_wait_cqe().
Still got some nasty bugs to fix but it's getting there, and io_uring really is a natural fit for Swift concurrency.
An echo server can be implemented as simply as a loop the iterates on an AsyncSequence of file descriptors returned by accept(), followed by recv() and send(). Nice!
Here's an example usage for a TCP echo server. The IORing library presents a file descriptor-based async/await interface to io_uring; IORingUtils has a few helpers for managing socket lifetimes, parsing addresses, etc, but is not part of the core API.
import AsyncExtensions
import Foundation
import Glibc
import IORing
import IORingUtils
public struct IORingTCPEcho {
private let socket: Socket
private let ring: IORing
private let bufferSize: Int
public static func main() async throws {
guard CommandLine.arguments.count == 2,
let port = UInt16(CommandLine.arguments[1])
else {
print("Usage: \(CommandLine.arguments[0]) [port]")
exit(1)
}
let echo = try IORingTCPEcho(port: port)
try await echo.run()
}
init(port: UInt16, bufferSize: Int = 32, backlog: Int = 128) throws {
self.bufferSize = bufferSize
ring = try IORing(depth: backlog)
socket = try Socket(domain: AF_INET, type: SOCK_STREAM.rawValue, protocol: 0)
try socket.setNonBlocking()
try socket.setReuseAddr()
try socket.setTcpNoDelay()
try socket.bind(to: sockaddr_in.any(port: port))
try socket.listen(backlog: backlog)
}
func echo(client: Socket) async {
do {
repeat {
let data = try await client.recv(count: bufferSize, ring: ring)
try await client.send(data, ring: ring)
} while true
} catch {
}
}
func run() async throws {
let clients = try await socket.accept(ring: ring)
for try await client in clients {
Task { await echo(client: client) }
}
}
}
I haven't got around to trying out SPI yet, which was my original reason for investigating io_uring, but I've replaced FlyingSocks with IORingSwift in my application (for Linux builds), and it seems to be working nicely. The downside of course is lack of portability, my application needs an abstraction layer to work both on macOS and Linux. Of course, SwiftNIO exists, but I think it might be a bit too heavyweight for my embedded use case.
SPI I'll probably want to use fixed buffers (io_uring_prep_write_fixed() and friends) which requires a bit of thinking regarding how to expose this to Swift in a safe manner without making any additional copies (which would defeat the purpose). I need to meditate on this a bit.
Getting this working reliably has been challenging! For those playing along at home (and as a note to my future self), I'll document my experience below.
Firstly, I naively assumed that non-actor isolated asynchronous functions invoked by an actor inherited the actor's execution context. (See SE-0338 which explains this is not the case.) I had wrapped io_uring_submit() in a helper class which was thus invoked on a generic executor β this caused a race which manifest as partially initialised submissions being submitted, and completions invoked multiple times.
Another issue to be careful of is the ambient nature of the io_uring_submit() API. Whilst it's probably an inevitable artefact of io_uring being, well, a ring, it requires care when used in reentrant contexts. io_uring_submit() will submit all outstanding entires that have been assigned with io_uring_get_sqe(). This means you cannot have a suspension point between acquiring a SQE and preparing it for submission. (Technically you can have one between preparing it and calling io_uring_submit(), at least if youβre not using linked SQEs.)
In my initial implementation, I used withCheckedContinuation() to bridge io_uring completions to async/await; however, being a suspension point, it was possible for io_uring_submit() to be called re-entrantly before the continuation was associated with the SQE. The solution was to register a callback immediately which would await the completion handler posting the result on an AsyncThrowingChannel (perhaps Swift concurrency wizards can suggest a better data structure for one-shot events? I could use a continuation but I don't want to busy-wait in onCompletion() for its allocation.
After watching this excellent video, I've learned that withCheckedContinuation immediately executes its body and only suspends on return, so the issue I struck out above was probably just another manifestation of the race condition.
final class SingleshotSubmission<T>: Submission<T> {
weak var group: SubmissionGroup<T>?
private typealias Continuation = CheckedContinuation<T, Error>
private var continuation: Continuation!
// initialiser removed for space
func submit() async throws -> T {
try await withCheckedThrowingContinuation { continuation in
// guaranteed to run immediately
self.continuation = continuation
Task { try await ring.submit() }
}
}
override func onCompletion(cqe: io_uring_cqe) async {
do {
try continuation.resume(returning: throwingErrno(cqe: cqe, handler))
} catch {
continuation.resume(throwing: error)
}
}
}
PS. The irony is I probably wouldn't have triggered the race condition were it not for an unrelated bug in my application code that resulted it in sending considerably more packets than intended.