I want to adapt one of the NIO Examples, but which one?
I need two Headless Clients to connect to one Headless Server which can freely send and received Data independently - just like two private telephone conversations going on at the same time.
Most of the NIO Examples do not show how to implement a Public Send function which is making life difficult for me, because I'm not very clever. I imagine having two servers on the same machine.
The machine with the servers would instantiate two of these...
final class Server {
let host = "0.0.0.0" // or "::1"
func connect(onPort: Int, withCallback: ((Data) -> Void)) {
...
}
func isConnected() -> Bool {
...
}
func send(data: Data) {
...
}
func disconnect() {
...
}
}
Each client machine would instantiate one of these...
Please could someone advise on where to start and offer help as I fill in the dots? My knowledge of Swift is basic. For example, I don't even know if the callback parameter should be defined as @espcaping or one of those other words inside square brackets.
So here's an example that roughly meets the shape of your program. The big limiting factor that prevents us meeting the API you've outlined is that because NIO is reactive and evented, it's not possible for us to create server connections via a connect function. Server connections are created when clients initiate them, so we have to wait for them, and necessarily they end up being created via a callback. As a result we had to introduce a new ServerFactory type that is responsible for creating the Server objects as connections come in.
I've also applied a bit of personal style here: rather than allow you to construct either of these via init I've chosen to construct them with static funcs. This is because these construction functions are both blocking, and I think it's bad form to block indefinitely in an init.
A final note: we're working on making this a lot easier by using the tools from Swift concurrency. I recommend taking a look at the example in this work-in-progress PR, which would show a worked example of how we'd create servers using a concurrency-aware version of NIO. This has a bunch of edge cases we still have to work out, but it's our vision of the future and well worth comparing to what we have here.
Without further ado, the server example:
import Foundation
import NIOCore
import NIOPosix
import NIOFoundationCompat
struct ServerFactory {
static func listen(host: String, port: Int, _ connectionCallback: @escaping (Server) -> Void) throws {
let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer {
try! group.syncShutdownGracefully()
}
let bootstrap = ServerBootstrap(group: group)
.childChannelInitializer { channel in
let server = Server(channel: channel)
connectionCallback(server)
return channel.eventLoop.makeSucceededVoidFuture()
}
let channel = try bootstrap.bind(host: host, port: port).wait()
// Park the main thread here.
try channel.closeFuture.wait()
}
}
final class Server {
typealias DataCallback = (Data) -> Void
private final class ServerHandler: ChannelInboundHandler {
typealias InboundIn = ByteBuffer
fileprivate var dataCallback: DataCallback?
init() {
self.dataCallback = nil
}
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let buffer = self.unwrapInboundIn(data)
self.dataCallback?(Data(buffer: buffer))
}
}
private let channel: Channel
// This hides the existence of the `ServerHandler` class but allows us to expose the property we want to let the user set.
// We use `preconditionInEventLoop` here because this operation is not thread-safe: you must only set this from the event
// loop thread.
var dataCallback: DataCallback? {
get {
self.channel.eventLoop.preconditionInEventLoop()
return try! self.channel.pipeline.syncOperations.handler(type: ServerHandler.self).dataCallback
}
set {
self.channel.eventLoop.preconditionInEventLoop()
try! self.channel.pipeline.syncOperations.handler(type: ServerHandler.self).dataCallback = newValue
}
}
init(channel: Channel) {
self.channel = channel
}
var isConnected: Bool {
return self.channel.isActive
}
func send(_ data: Data) {
self.channel.writeAndFlush(ByteBuffer(data: data), promise: nil)
}
func disconnect() {
self.channel.close(promise: nil)
}
}
I most certainly will, but for now, all I can say is "WOW!". You may remember me saying something similar last year when you were helping me adapt the NIOEchoServer. That's been working just fine, but now, as my project evolves into an interconnected network with unique APIs, I need more. Thank you so much, Cory, for coming up with a really good example and mentioning what the future will bring.
I really wish I could contribute to the development going on here, but its way above my head. I enjoy writing my own stuff and complaining when things aren't they way I'd like. However, I was able to report a Swift bug and you guys fixed it in a point release.
I've got the linux client talking full duplex to a macOS app called Socket Debugger - with this...
import Foundation
import NIOCore
import NIOPosix
import NIOFoundationCompat
struct MsgAPI: Codable {
var str: String
}
final class Application {
var clientOne: Client?
func run() {
let group = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount)
do {
clientOne = try Client.connect(group: group, host: "iMac.local", port: 8001, callbackOne)
} catch {
print(error)
exit(1)
}
doSomeTesting()
}
func doSomeTesting() {
print("clientOne \(clientOne!.isConnected ? "is" : "not") connected")
if clientOne!.isConnected {
Task {
for n in 1...10 {
do {
let msg = MsgAPI(str: "clientOne sent \(n)")
let data = try JSONEncoder().encode(msg)
clientOne!.send(data)
} catch {
print(error)
}
try! await Task.sleep(nanoseconds: 2_000_000_000)
}
clientOne!.disconnect()
exit(0)
}
}
}
func callbackOne(_ data: Data) {
do {
let msg = try JSONDecoder().decode(MsgAPI.self, from: data)
print(msg)
} catch {
print(error)
}
}
}
Works like a dream, so I'm looking forward to creating multiple clients and servers on the same machine but with different ports and data APIs.
However, and after many attempts, I'm not having much luck with the server. Here's my latest...
import Foundation
import NIOCore
import NIOPosix
import NIOFoundationCompat
struct MsgAPI: Codable {
var str: String
}
final class Application {
var serverOne: Server?
func run() {
do {
try ServerFactory.listen(host: "0.0.0.0", port: 8001, newServer)
} catch {
print(error)
exit(1)
}
doSomeTesting()
}
private func newServer(_ server: Server) {
serverOne = server
serverOne!.dataCallback = callbackOne
}
func doSomeTesting() {
print("serverOne \(serverOne!.isConnected ? "is" : "not") running")
if serverOne!.isConnected {
Task {
var n = 0
while n < 10 {
try! await Task.sleep(nanoseconds: 2_000_000_000)
do {
n += 1
let msg = MsgAPI(str: "serverOne sent \(n)")
let data = try JSONEncoder().encode(msg)
serverOne!.send(data)
} catch {
print(error)
}
}
serverOne!.disconnect()
exit(0)
}
}
}
func callbackOne(_ data: Data) {
do {
let response = try JSONDecoder().decode(MsgAPI.self, from: data)
print(response)
} catch {
print(error)
}
}
}
I think I'm failing to understand how to create the server, so please can you help me out with this?
A couple of minor observations: isConnected() doesn't return false when not connected, and it would be nice if the client var didn't have to be an optional. I also wonder if this could be implemented as a single P2P class, but perhaps I digress.
Sorry for including the entire code, but I thought others might find it helpful.
Ah shoot, I missed an important part of the server setup. Can you take the block that begins with .childChannelInitializer and replace it with:
.childChannelInitializer { channel in
channel.pipeline.syncOperations.addHandler(Server.ServerHandler())
let server = Server(channel: channel)
connectionCallback(server)
return channel.eventLoop.makeSucceededVoidFuture()
}
You'll have to make the ServerHandler class internal as well.
Just one question for now: I'll also need a UDP Client to run alongside your Client and Server code. I assume the Client Example bootstrap could be changed, but where?
You can use DatagramBootstrap instead. Note that at this time we don't support connected-mode UDP, so instead of using connect you'll use bind and then send AddressedEnvelope<ByteBuffer> messages that contain the address you want to send to.
Whilst I've been able to containerise the Client into a self contained class, I can't find a way to do the same for the Server. I need to get out of the ServerFactory's closure. No matter in a simple proof of concept app, but not when I need to insert a 1000 lines with other classes, actors and tasks, and with extra servers or clients, I think thread management will become a nightmare, with everything interconnected in some way or other.
I know you're very busy, but it will be most helpful if you can come up with a solution just for me.
This diagram should explain what I've been up to over the past year, most of which is working - except for my grasp on networks.
Yeah this is the problem that our linked Swift concurrency work will be solving. The only way to "escape" the closure is to introduce some abstraction that can turn a reactive, push-based stream (which NIO provides) into a blocking, pull-based one. We have these abstractions in the works for Swift concurrency, where we'll be creating a number of backpressure-propagating AsyncSequences to make this easier.
Without that you're ultimately needing to construct a thread-safe blocking queue of some kind. This is just hard, no two ways about it. If you're using Swift concurrency then a thing you can do is to replace the implementation of the callback that provides Server with an AsyncStream, and then consume that AsyncStream from within your application. This is imperfect, but it'll help substantially.
I can't commit to anything stronger than "this year". We really want to get it out the door, but we're swamped at the moment and so we're trying to eke out whatever time we have to move this forward.
I gave up with AsyncStream, but I am having some success with my own convoluted way which works good enough on the linux, but not with SwiftUI and @MainActor on macOS.
Note: NIO_TCP_Client is an encapsulated version of your example client.
func connect() {
satServerClient = try NIO_TCP_Client.connect(host: "satserver.local", port: 8001, satServerCallback)
// COMPILE ERROR IS:
// Converting function value of type '@MainActor (Data) -> ()' to '(Data) -> Void' loses global actor 'MainActor'
}
func satServerCallback(data: Data) {
do {
var response = try JSONDecoder().decode(ResponseAPI.self, from: data)
displayResponse(data: &response) // this func is on the main thread
} catch {
print("Unable to decode:\n\(error)")
}
}
Not being able to solve this "little problem" is the only thing preventing the entire system from working.
Is there a simple way to keep the callback address on the main thread?
I think you should be able to mark the callback parameter in .connect as @MainActor. Alternatively, you could use an explicit trailing callback to wrap it:
Thank you, Cory. Your ($0) solution is working, but it was also necessary to change the callback function to this...
func satServerCallback(data: Data) {
DispatchQueue.main.async {
do {
var res = try JSONDecoder().decode(ResponseAPI.self, from: data)
self.displayResponse(data: &res)
} catch {
print("Unable to decode:\n\(error)")
}
}
}
It seems to me one can throw @MainActor modifiers around all over the place, but they rarely make a difference. If or when Apple hand SwiftUI to the community, things might improve. For example, it's very slow to render a highly populated UI on an iMac. Slow to the point where it's almost unusable - presumably because all the buttons and what-nots are rendering on the same thread.
However, I now have a system that is working "good enough" for now. I do have some decoding errors occurring, but this is because SwiftUI can't keep up with the incoming data.
Unable to decode:
dataCorrupted(Swift.DecodingError.Context(codingPath: [], debugDescription: "The given data was not valid JSON.", underlyingError: Optional(Error Domain=NSCocoaErrorDomain Code=3840 "Garbage at end around line 1, column 1291." UserInfo={NSDebugDescription=Garbage at end around line 1, column 1291., NSJSONSerializationErrorIndex=1291})))