Using SwiftNIO Future/Promise to make async/await style TCP socket interface

I'm building a TCP socket interface class in async/await style using SwiftNIO, the interface is in one-write-one-read pattern.

After some work I realized that after I write and flush some data to the socket, I can only get response data from a ChannelHandler, which in my case is ChannelInboundHandler. Its callback function channelRead got the respond data, but I have no idea how to pass that data to the interface function.

I read the docs and know that there are EventLoopFuture and EventLoopPromise which may probably can help me achieve my goal, but I just can't figure out how.

I've searched the internet and found this, I used code mentioned but didn't work.

My code:

Interface class

class PTSocket: ObservableObject {
    private var host: String
    private var port: Int

    private var handler: PTHandler?
    private var eventLoopGroup: EventLoopGroup
    private var bootstrap: ClientBootstrap
    private var channel: Channel?

    init(host: String, port: Int) {
        let handler = PTHandler()

        self.host = host
        self.port = port

        eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
        bootstrap = ClientBootstrap(group: eventLoopGroup)
            .channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
            .channelInitializer { channel in
                channel.pipeline.addHandler(handler)
            }

        self.handler = handler
    }

    public func connect() async throws {
        channel = try { () -> Channel in
            return try bootstrap.connect(host: host, port: port).wait()
        }()

        // Handshake
        try await send(command: .handshake)
    }

    public func send(command: PTMagic) async throws -> [UInt8] {
        let buffer = channel!.allocator.buffer(bytes: command.magic)

        channel!.writeAndFlush(buffer, promise: nil)
        let promise = channel?.eventLoop.next().makePromise(of: [UInt8].self)
        

        
        let result = try await promise!.futureResult.get()
        print("Waiting for data:\(result)")

        return []
    }

}

Handler class

class PTHandler: ChannelInboundHandler {
    public typealias InboundIn = ByteBuffer
    public typealias OutboundOut = ByteBuffer

    private var responseBytes: [UInt8] = []

    //    For reading chunks
    private var totalBytesCount: Int = 0
    private var remainingBytesCount: Int = 0
    private var existingBytes: [UInt8] = []

    //    init(callback: @escaping ([UInt8]) -> Void) {
    //        readCallback = callback
    //    }

    public func channelActive(context: ChannelHandlerContext) {
        print("Socket connected.")

    }

    func channelRead(context: ChannelHandlerContext, data: NIOAny) {
        var buffer = unwrapInboundIn(data)

        if let received = buffer.readBytes(length: buffer.readableBytes) {
            print("[Server] Reply: '\(received)'")
            responseBytes.append(contentsOf: received)
        }
        context.fireChannelRead(data)
    }

    func channelReadComplete(context: ChannelHandlerContext) {
        print("Read complete: \(responseBytes)")
        context.fireChannelReadComplete()
        responseBytes.removeAll()
    }

    public func errorCaught(context: ChannelHandlerContext, error: Error) {
        print("error===: ", error)

        context.close(promise: nil)
    }
}

If I run this code, the Waiting for data print is never reached, and it blocks my next call to send.

in general, you want the handlers to be associated with exactly one channel for the callback notifications to be meaningful. so it’s often better to create them in the channelInitializer(_:) closure. one client bootstrap can call it multiple times during connection setup.

an EventLoopPromise is like a CheckedContinuation. if you never complete the promise, the future will never return. the eventLoop doesn’t associate the promise with the channel, and you haven’t passed the promise along to anything that could complete it.

We are currently working on making the interop between NIO and Swift Concurrency easier and have created some new APIs that are currently hidden behind an SPI flag. The goal for those is to make it really easy to consume data from a Channel and write data back into it.

A very basic example of setting up a TCP client which you can use to write ByteBuffers and read them looks like this

@_spi(AsyncChannel) import NIOCore
@_spi(AsyncChannel) import NIOPosix

let channel = try await ClientBootstrap(group: MultiThreadedEventLoopGroup.singleton)
    .connect(
        host: self.host,
        port: self.port
    ) { channel in
        channel.eventLoop.makeCompletedFuture {
            return try NIOAsyncChannel(
                synchronouslyWrapping: channel,
                configuration: NIOAsyncChannel.Configuration(
                    inboundType: ByteBuffer.self,
                    outboundType:  ByteBuffer.self
                )
            )
        }
    }

print("Writing data")
try await channel.outboundWriter.write(ByteBuffer(string: "Some data"))

for try await inboundData in channel.inboundStream {
    print("Received data (\(inboundData))")
}

Depending on the underlying protocol spoken on the connection you can handle some of it with ChannelHandlers. For example, it might be that your protocol is using a newline delimited framing. In general, we want to encourage people to implement network protocols as ChannelHandlers and business logic in the Concurrency world. This is a TCPEchoClient example that uses newline delimited framing to communicate with a server.

That's great to hear! I searched deeper earlier today and found this json rpc client demo, and figured out how to implement what I want for now, although I'm really not familiar with NIO, but here's what I got:

I just need to make my handler implement not only ChannelInboundHandler but also ChannelOutboundHandler, thus I can customize my writing data, aka pass write data to writeAndFlush with a newly created promise:

let promise = channel.eventLoop.makePromise(of: ByteBuffer.self)
try await channel.writeAndFlush(RequestWrapper(request: command, promise: promise))

Now in my handler I can unwrap the wrapper and get the promise, which can be stored and wait for use when channelReadComplete or whenever I like:

public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
    let wrapper = unwrapOutboundIn(data)
    queue.append(wrapper)

    let buffer = context.channel.allocator.buffer(bytes: wrapper.request.magic)
    context.writeAndFlush(wrapOutboundOut(buffer), promise: promise)
}

The function above lives in my handler, it can "intercept" data I'm about to send, which is a wrapper with a promise, the function get the promise and store it for future use, in my case is channelRead.

However, my approach introduce another problem:

NIO reads data in an event loop, that means it calls channelReadComplete every time a loop ends. But in my case I have some requests that replies with data larger than which the loop can read in one loop. In this case I can't know when the handler finishes reading the whole reply data.

I know I can solve this by defining a data protocol, like first 4 bytes are data length.But I just wonder if there're other ways to achieve this, elegantly. A callback function for handler protocols triggered when there's no more data to read, for example.

This is fundamentally unsolveable. TCP does not have a notion of a "message": it's a stream abstraction, providing a stream of bytes. If you want messages inside TCP, you have to frame them yourself.

1 Like