Which NIO Example To Adapt?

I want to adapt one of the NIO Examples, but which one?

I need two Headless Clients to connect to one Headless Server which can freely send and received Data independently - just like two private telephone conversations going on at the same time.

Most of the NIO Examples do not show how to implement a Public Send function which is making life difficult for me, because I'm not very clever. I imagine having two servers on the same machine.

The machine with the servers would instantiate two of these...

final class Server {
    let host = "0.0.0.0" // or "::1"
    
    func connect(onPort: Int, withCallback: ((Data) -> Void)) {
        ...
    }
    
    func isConnected() -> Bool {
        ...
    }
    
    func send(data: Data) {
        ...
    }
    
    func disconnect() {
        ...
    }
}

Each client machine would instantiate one of these...


final class Client {
    
    func connect(toHost: String, onPort: Int, withCallback: ((Data) -> Void))) {
        ...
    }
    
    func isConnected() -> Bool {
        ...
    }
    
    func send(data: Data) {
        ...
    }
    
    func disconnect() {
        ...
    }
}

Please could someone advise on where to start and offer help as I fill in the dots? My knowledge of Swift is basic. For example, I don't even know if the callback parameter should be defined as @espcaping or one of those other words inside square brackets.

So here's an example that roughly meets the shape of your program. The big limiting factor that prevents us meeting the API you've outlined is that because NIO is reactive and evented, it's not possible for us to create server connections via a connect function. Server connections are created when clients initiate them, so we have to wait for them, and necessarily they end up being created via a callback. As a result we had to introduce a new ServerFactory type that is responsible for creating the Server objects as connections come in.

I've also applied a bit of personal style here: rather than allow you to construct either of these via init I've chosen to construct them with static funcs. This is because these construction functions are both blocking, and I think it's bad form to block indefinitely in an init.

A final note: we're working on making this a lot easier by using the tools from Swift concurrency. I recommend taking a look at the example in this work-in-progress PR, which would show a worked example of how we'd create servers using a concurrency-aware version of NIO. This has a bunch of edge cases we still have to work out, but it's our vision of the future and well worth comparing to what we have here.

Without further ado, the server example:

import Foundation
import NIOCore
import NIOPosix
import NIOFoundationCompat

struct ServerFactory {
    static func listen(host: String, port: Int, _ connectionCallback: @escaping (Server) -> Void) throws {
        let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
        defer {
            try! group.syncShutdownGracefully()
        }

        let bootstrap = ServerBootstrap(group: group)
            .childChannelInitializer { channel in
                let server = Server(channel: channel)
                connectionCallback(server)
                return channel.eventLoop.makeSucceededVoidFuture()
            }

        let channel = try bootstrap.bind(host: host, port: port).wait()

        // Park the main thread here.
        try channel.closeFuture.wait()
    }
}

final class Server {
    typealias DataCallback = (Data) -> Void

    private final class ServerHandler: ChannelInboundHandler {
        typealias InboundIn = ByteBuffer

        fileprivate var dataCallback: DataCallback?

        init() {
            self.dataCallback = nil
        }

        func channelRead(context: ChannelHandlerContext, data: NIOAny) {
            let buffer = self.unwrapInboundIn(data)
            self.dataCallback?(Data(buffer: buffer))
        }
    }

    private let channel: Channel

    // This hides the existence of the `ServerHandler` class but allows us to expose the property we want to let the user set.
    // We use `preconditionInEventLoop` here because this operation is not thread-safe: you must only set this from the event
    // loop thread.
    var dataCallback: DataCallback? {
        get {
            self.channel.eventLoop.preconditionInEventLoop()
            return try! self.channel.pipeline.syncOperations.handler(type: ServerHandler.self).dataCallback
        }
        set {
            self.channel.eventLoop.preconditionInEventLoop()
            try! self.channel.pipeline.syncOperations.handler(type: ServerHandler.self).dataCallback = newValue
        }
    }

    init(channel: Channel) {
        self.channel = channel
    }

    var isConnected: Bool {
        return self.channel.isActive
    }

    func send(_ data: Data) {
        self.channel.writeAndFlush(ByteBuffer(data: data), promise: nil)
    }

    func disconnect() {
        self.channel.close(promise: nil)
    }
}

And the client example:

final class Client {
    typealias DataCallback = (Data) -> Void

    private final class ClientHandler: ChannelInboundHandler {
        typealias InboundIn = ByteBuffer

        fileprivate var dataCallback: DataCallback

        init(dataCallback: @escaping DataCallback) {
            self.dataCallback = dataCallback
        }

        func channelRead(context: ChannelHandlerContext, data: NIOAny) {
            let buffer = self.unwrapInboundIn(data)
            self.dataCallback(Data(buffer: buffer))
        }
    }

    private let channel: Channel

    private init(channel: Channel) {
        self.channel = channel
    }

    static func connect(group: EventLoopGroup, host: String, port: Int, _ callback: @escaping DataCallback) throws -> Client {
        let bootstrap = ClientBootstrap(group: group)
            .channelInitializer { channel in
                return channel.pipeline.addHandler(ClientHandler(dataCallback: callback))
            }

        let channel = try bootstrap.connect(host: host, port: port).wait()

        return Client(channel: channel)
    }

    var isConnected: Bool {
        return self.channel.isActive
    }

    func send(_ data: Data) {
        self.channel.writeAndFlush(ByteBuffer(data: data), promise: nil)
    }

    func disconnect() {
        self.channel.close(promise: nil)
    }
}

If you have questions about any of this please do feel free to ask.

5 Likes

I most certainly will, but for now, all I can say is "WOW!". You may remember me saying something similar last year when you were helping me adapt the NIOEchoServer. That's been working just fine, but now, as my project evolves into an interconnected network with unique APIs, I need more. Thank you so much, Cory, for coming up with a really good example and mentioning what the future will bring.

I really wish I could contribute to the development going on here, but its way above my head. I enjoy writing my own stuff and complaining when things aren't they way I'd like. However, I was able to report a Swift bug and you guys fixed it in a point release.

1 Like

Hi Cory,

I've got the linux client talking full duplex to a macOS app called Socket Debugger - with this...

import Foundation
import NIOCore
import NIOPosix
import NIOFoundationCompat

struct MsgAPI: Codable {
    var str: String
}

final class Application {
    var clientOne: Client?
    
    func run() {
        let group = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount)
        do {
            clientOne = try Client.connect(group: group, host: "iMac.local", port: 8001, callbackOne)
        } catch {
            print(error)
            exit(1)
        }
        doSomeTesting()
    }
    
    func doSomeTesting() {
        print("clientOne \(clientOne!.isConnected ? "is" : "not") connected")
        if clientOne!.isConnected {
            Task {
                for n in 1...10 {
                    do {
                        let msg = MsgAPI(str: "clientOne sent \(n)")
                        let data =  try JSONEncoder().encode(msg)
                        clientOne!.send(data)
                    } catch {
                        print(error)
                    }
                    try! await Task.sleep(nanoseconds: 2_000_000_000)
                }
                clientOne!.disconnect()
                exit(0)
            }
        }
    }
    
    func callbackOne(_ data: Data) {
        do {
            let msg = try JSONDecoder().decode(MsgAPI.self, from: data)
            print(msg)
        } catch {
            print(error)
        }
    }
}

Works like a dream, so I'm looking forward to creating multiple clients and servers on the same machine but with different ports and data APIs.

However, and after many attempts, I'm not having much luck with the server. Here's my latest...

import Foundation
import NIOCore
import NIOPosix
import NIOFoundationCompat

struct MsgAPI: Codable {
    var str: String
}

final class Application {
    var serverOne: Server?
    
    func run() {
        do {
            try ServerFactory.listen(host: "0.0.0.0", port: 8001, newServer)
        } catch {
            print(error)
            exit(1)
        }
        doSomeTesting()
    }
    
    private func newServer(_ server: Server) {
        serverOne = server
        serverOne!.dataCallback = callbackOne
    }
    
    func doSomeTesting() {
        print("serverOne \(serverOne!.isConnected ? "is" : "not") running")
        if serverOne!.isConnected {
            Task {
                var n = 0
                while n < 10 {
                    try! await Task.sleep(nanoseconds: 2_000_000_000)
                    do {
                        n += 1
                        let msg = MsgAPI(str: "serverOne sent \(n)")
                        let data =  try JSONEncoder().encode(msg)
                        serverOne!.send(data)
                    } catch {
                        print(error)
                    }
                }
                serverOne!.disconnect()
                exit(0)
            }
        }
    }
    
    func callbackOne(_ data: Data) {
        do {
            let response = try JSONDecoder().decode(MsgAPI.self, from: data)
            print(response)
        } catch {
            print(error)
        }
    }
}

I think I'm failing to understand how to create the server, so please can you help me out with this?

A couple of minor observations: isConnected() doesn't return false when not connected, and it would be nice if the client var didn't have to be an optional. I also wonder if this could be implemented as a single P2P class, but perhaps I digress.

Sorry for including the entire code, but I thought others might find it helpful.

Hi Cory. Here's my latest attempt to create the server.

final class Application {
    var serverOne: Server?
    
    func run() {
        do {
            try ServerFactory.listen(host: "0.0.0.0", port: 8001, { (s: Server) in
                s.dataCallback = self.callbackOne
                self.serverOne = s
            })
        } catch {
            print(error)
            exit(1)
        }
     }

It compiles and runs, but crashes when the client connects.

CoryServers/Server.swift:67: Fatal error: 'try!' expression unexpectedly raised an error: NIOCore.ChannelPipelineError.notFound
Current stack trace:
0    libswiftCore.so                    0x0000007fb4b031a8 _swift_stdlib_reportFatalErrorInFile + 128
1    libswiftCore.so                    0x0000007fb482ca34 _assertionFailure(_:_:file:line:flags:) + 300
2    libswiftCore.so                    0x0000007fb4877d8c Dictionary.init<A>(_:uniquingKeysWith:) + 0
3    CoryServers                        0x000000556dd08130 <unavailable> + 295216
4    CoryServers                        0x000000556dd049ec <unavailable> + 281068
...
60:    var dataCallback: DataCallback? {
61:        get {
62:            self.channel.eventLoop.preconditionInEventLoop()
63:            return try! self.channel.pipeline.syncOperations.handler(type: ServerHandler.self).dataCallback
64:        }
65:        set {
66:            self.channel.eventLoop.preconditionInEventLoop()
67:            try! self.channel.pipeline.syncOperations.handler(type: ServerHandler.self).dataCallback = newValue
68:        }
69:    }

Ah shoot, I missed an important part of the server setup. Can you take the block that begins with .childChannelInitializer and replace it with:

            .childChannelInitializer { channel in
                channel.pipeline.syncOperations.addHandler(Server.ServerHandler())
                let server = Server(channel: channel)
                connectionCallback(server)
                return channel.eventLoop.makeSucceededVoidFuture()
            }

You'll have to make the ServerHandler class internal as well.

I needed to insert try! to get it to compile

            .childChannelInitializer { channel in
//                channel.pipeline.syncOperations.addHandler(Server.ServerHandler())
                try! channel.pipeline.syncOperations.addHandler(Server.ServerHandler())

and remove private

//    private final class ServerHandler: ChannelInboundHandler {
    internal final class ServerHandler: ChannelInboundHandler {

So now the server will accept a connection and receive data, but it won't send.

final class Application {
    var serverOne: Server?
    
    func run() {
        do {
            try ServerFactory.listen(host: "0.0.0.0", port: 8001, { (s: Server) in
                s.dataCallback = self.callbackOne
                self.serverOne = s
            })
            print("after ServerFactory in \(#function)")
        } catch {
            print(error)
            exit(1)
        }
        doSomeTesting()
    }

The try block never returns, so print("after... and doSomeTesting() don't get called.

Is it because I'm not using ServerFactory correctly?

Yes. listen runs forever: it parks the main thread. You need to kick off your work from within the callback you pass to listen.

Thanks Cory. I now have 2 servers working on different ports - each as a separate Task. I hope to be able to build on this.

Great stuff! As always, please ask away if you have further questions.

Just one question for now: I'll also need a UDP Client to run alongside your Client and Server code. I assume the Client Example bootstrap could be changed, but where?

You can use DatagramBootstrap instead. Note that at this time we don't support connected-mode UDP, so instead of using connect you'll use bind and then send AddressedEnvelope<ByteBuffer> messages that contain the address you want to send to.

Whilst I've been able to containerise the Client into a self contained class, I can't find a way to do the same for the Server. I need to get out of the ServerFactory's closure. No matter in a simple proof of concept app, but not when I need to insert a 1000 lines with other classes, actors and tasks, and with extra servers or clients, I think thread management will become a nightmare, with everything interconnected in some way or other.

I know you're very busy, but it will be most helpful if you can come up with a solution just for me.

This diagram should explain what I've been up to over the past year, most of which is working - except for my grasp on networks.

Yeah this is the problem that our linked Swift concurrency work will be solving. The only way to "escape" the closure is to introduce some abstraction that can turn a reactive, push-based stream (which NIO provides) into a blocking, pull-based one. We have these abstractions in the works for Swift concurrency, where we'll be creating a number of backpressure-propagating AsyncSequences to make this easier.

Without that you're ultimately needing to construct a thread-safe blocking queue of some kind. This is just hard, no two ways about it. If you're using Swift concurrency then a thing you can do is to replace the implementation of the callback that provides Server with an AsyncStream, and then consume that AsyncStream from within your application. This is imperfect, but it'll help substantially.

1 Like

I'm away for the next week, so I'll play around with AsyncStream when I return. Can it be used on Linux?

Does the concurrency team have a possible release date? Next year, this year, or soon?

Yes.

I can't commit to anything stronger than "this year". We really want to get it out the door, but we're swamped at the moment and so we're trying to eke out whatever time we have to move this forward.

I gave up with AsyncStream, but I am having some success with my own convoluted way which works good enough on the linux, but not with SwiftUI and @MainActor on macOS.

Note: NIO_TCP_Client is an encapsulated version of your example client.

func connect() {
   satServerClient = try NIO_TCP_Client.connect(host: "satserver.local", port: 8001, satServerCallback)
// COMPILE ERROR IS:
// Converting function value of type '@MainActor (Data) -> ()' to '(Data) -> Void' loses global actor 'MainActor'
}

func satServerCallback(data: Data) {
   do {
      var response = try JSONDecoder().decode(ResponseAPI.self, from: data)
      displayResponse(data: &response) // this func is on the main thread
   } catch {
      print("Unable to decode:\n\(error)")
   }
}

Not being able to solve this "little problem" is the only thing preventing the entire system from working.

Is there a simple way to keep the callback address on the main thread?

I think you should be able to mark the callback parameter in .connect as @MainActor. Alternatively, you could use an explicit trailing callback to wrap it:

satServerClient = try NIO_TCP_Client.connect(host: "satserver.local", port: 8001) { 
    self.satServerCallback($0)
}

This can be annotated as @MainActor as needed.

Thank you, Cory. Your ($0) solution is working, but it was also necessary to change the callback function to this...

    func satServerCallback(data: Data) {
        DispatchQueue.main.async {
            do {
                var res = try JSONDecoder().decode(ResponseAPI.self, from: data)
                self.displayResponse(data: &res)
            } catch {
                print("Unable to decode:\n\(error)")
            }
        }
    }

It seems to me one can throw @MainActor modifiers around all over the place, but they rarely make a difference. If or when Apple hand SwiftUI to the community, things might improve. For example, it's very slow to render a highly populated UI on an iMac. Slow to the point where it's almost unusable - presumably because all the buttons and what-nots are rendering on the same thread.

However, I now have a system that is working "good enough" for now. I do have some decoding errors occurring, but this is because SwiftUI can't keep up with the incoming data.

Unable to decode:
dataCorrupted(Swift.DecodingError.Context(codingPath: [], debugDescription: "The given data was not valid JSON.", underlyingError: Optional(Error Domain=NSCocoaErrorDomain Code=3840 "Garbage at end around line 1, column 1291." UserInfo={NSDebugDescription=Garbage at end around line 1, column 1291., NSJSONSerializationErrorIndex=1291})))

My mother gave me Cocoa before sending me to bed.