Can't get writeAndFlush to send bytes

I'm needing to write a simple socket relay and I've bootstrapped two sockets that successfully receive connections. However, when I attempt to write what has been read on the one socket to the outgoing socket no bytes are sent.

Perhaps I'm overcomplicating this with two sockets, but I need to ensure only one sockets input gets sent to all the other sockets clients and the listening socket clients don't send anything to the input socket.

I'm bootstrapping the sockets similarly:

final class CloudRelay {

  let listeningPort: UInt
  let group = MultiThreadedEventLoopGroup(numberOfThreads: max(1,System.coreCount-1))
  let channel: Channel?

  init(port: UInt) {
     listeningPort = port

    // Set up the server using a Bootstrap
    let bootstrap = ServerBootstrap(group: group)
    // Define backlog and enable SO_REUSEADDR options atethe server level
       .serverChannelOption(ChannelOptions.backlog, value: 256)
      .serverChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)

      // Handler Pipeline: handlers that are processing events from accepted Channels
      // To demonstrate that we can have several reusable handlers we start with a Swift-NIO default
      // handler that limits the speed at which we read from the client if we cannot keep up with the
      // processing through EchoHandler.
      // This is to protect the server from overload.
      .childChannelInitializer { channel in
        channel.pipeline.add(handler: BackPressureHandler()).then { v in
          channel.pipeline.add(handler: CloudRelayHandler())
        }
      }

      // Enable common socket options at the channel level (TCP_NODELAY and SO_REUSEADDR).
      // These options are applied to accepted Channels
      .childChannelOption(ChannelOptions.socket(IPPROTO_TCP, TCP_NODELAY), value: 1)
      .childChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
      // Message grouping
      .childChannelOption(ChannelOptions.maxMessagesPerRead, value: 16)
      // Let Swift-NIO adjust the buffer size, based on actual trafic.
      .childChannelOption(ChannelOptions.recvAllocator, value: AdaptiveRecvByteBufferAllocator())

      // Bind the port and run the server
      do {
        channel = try bootstrap.bind(host: "0.0.0.0", port: Int(listeningPort)).wait()
        print("Server started and listening on \(channel!.localAddress!)")
      }
      catch {
        print("Server failed to start")
        channel = nil
      }
    }

  deinit {
    try? group.syncShutdownGracefully()
  }

}

In the input socket ChannelInboundHandler I respond to the channelRead as such, attempting to write to the other socket's channel:

// Invoked when data are received from the client
public func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
     ctx.write(data, promise: nil) // echo
     var buf = unwrapInboundIn(data)
     let s = buf.readString(length: buf.readableBytes)
     print("Read: \(s ?? "")")
     cloudRelay?.channel?.writeAndFlush(data, promise: nil)
}

Am I setting up the 2nd socket incorrectly to get it to write what is sent from the first socket? I do have a InboundHandler on both sockets to accept the connections but neither really aren't transforming messages, just forwarding; its OutboundHandler write() handler is never being called.

Should I even have two sockets here but still limit just the one known input connection to writing to the other clients?

TIA

Thanks for your question! If you don't mind, can you confirm (or correct) that you're trying to implement the following:

  • There is one server
  • There is one "sender client" which sends data over TCP to the server
  • There are one or more "receiver clients" which receive the data that the "sender client" has sent, the data is relayed via the server

So something like this?

                                                 conn2       +--------------------+
                                             +-------------> | receiver client 1  |
                                             |               +--------------------+
                               +-----------+-+
+----------------+             |           | 
|                |  conn1      |           |     conn3       +--------------------+
| sender client  +-----------> |   server  +---------------> | receiver client 2  |
|                |             |           |                 +--------------------+
+----------------+             |           | 
                               +-----------+-+
                                             |   conn4       +--------------------+
                                             +-------------> | receiver client 3  |
                                                             +--------------------+


You're using ctx: ChannelHandlerContext and not context: ChannelHandlerContext. In SwiftNIO 1 (released in March 2018) we indeed used ctx but NIO 2 (released in March 2019) we switched the spelling to context. We very highly recommend to use SwiftNIO 2 at this point in time, are you planning to use NIO1 or is this maybe a typo? If that's a typo this will cause issues.

Lastly, if you'd like an example of a server that forwards traffic from one client to another host, we have the TLSify example. TLSify allows a client to connect and speak plain text and TLSify will forward -- wrapped in TLS -- to another server. It doesn't sound 100% like what you need but it does implement forwarding data received on one Channel to another. The TLS wrapping is irrelevant for you but you can just ignore it for now.

Thanks Johannes. Yes, your picture is clearer than my words, yes you have the gist of it. I am using SwiftNIO 1 as this is a feature being added to an existing Vapor 3 app that I have yet to upgrade to 4 (on the todo list...)

I will take a look at TLSify more closely, as it does appear to be doing similar but performs the additional TLS work. Thanks for sharing an example I can work from. Will report back tonight if I'm still unable to cross all the bridges.

Sounds good! Another (less polished) example you may want to look at is the NIOChatServer and NIOChatClient. The idea is that you run one NIOChatServer and any number of NIOChatClients and anything one client says goes to all of the other clients via the server.

I reckon that most of the ideas you'll need can be pieced together from TLSify + NIOChat[Server/Client] but by all means, reach out here when you have questions!

Ok, I'll have to admit I've read through the two examples you've listed and the SwiftNio documentation explaining channels & handlers and am still not able to get the gist of what I'm supposed to do.

In the NIOChatServer ChatHandler channelActive invocation its storing the new listening client in a Dictionary (tho when I use queue.async like NIOChatServer does in my Vapor 3 app NIO asserts) and then on each channelRead is writing to all child channels stored in the dictionary. This seems like the clearest and simplest approach so I've attempted to implement that (using only 1 server listening socket that both sender clients and receiver clients connect to, and will later add logic to control which client can write (because I wrote that app and can send a token...)).

However, when I implement this in my Vapor app, I initiate the 2 connections simply by telnet in separate bash windows, this adds the connection to the channels dictionary in the Handler channelActive but it appears its adding the channel to a different handler instance, unlike when I run NIOChatServer where the dictionary gets multiple values on each call to channelActive. Am not sure if this is Vapor and its own NIO event loop management mucking things up or simply I'm using NIO 1.0 incorrectly.

I've extracted this feature into a vanilla Vapor 3 app and get the same behavior. Here's the handler code (the bootstrap is still simply as it is above):

private final class RelayCloudHandler: ChannelInboundHandler {
  public typealias InboundIn = ByteBuffer
  public typealias OutboundOut = ByteBuffer

  private var channels = [ObjectIdentifier:Channel]()

  // Invoked on client connection
  public func channelRegistered(ctx: ChannelHandlerContext) {
     print("channel registered:", ctx.remoteAddress ?? "unknown")
  }

  public func channelActive(ctx: ChannelHandlerContext) {
    self.channels[ObjectIdentifier(ctx.channel)] = ctx.channel
  }

  // Invoked on client disconnect
  public func channelUnregistered(ctx: ChannelHandlerContext) {

  }

  public func channelInactive(ctx: ChannelHandlerContext) {
    self.channels.removeValue(forKey: ObjectIdentifier(ctx.channel))
  }
   // Invoked when data are received from the client
  public func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
  //      ctx.write(data, promise: nil) // echo
      var buf = unwrapInboundIn(data)
      let s = buf.readString(length: buf.readableBytes)
      print("Read: \(s ?? "")")
      // Only write the read bytes to the listeners, never to the sender client 
      // will later add logic to ignore reads from the listeners
      channels.forEach { id, channel in
        if id != ObjectIdentifier(ctx.channel) {
          channel.writeAndFlush(data, promise: nil)
        }
      }
  }

  // Invoked when channelRead as processed all the read event in the current read operation
  public func channelReadComplete(ctx: ChannelHandlerContext) {
      ctx.flush()
  }

  // Invoked when an error occurs
  public func errorCaught(ctx: ChannelHandlerContext, error: Error) {
      print("error: ", error
      ctx.close(promise: nil)
  }
}

Perhaps I should just adapt NIOChatServer for my needs and build a command line app, as it appears I can't expose two ports on Heroku anyway. Still like to understand why plan A isn't working as I've come to understand how things work.

This will all work in NIO 1 & NIO 2, again I would very much recommend to not write any NIO 1 code at this point in time.

Leaving this aside, if you base what you're building on the chat server example, we need to understand one fundamental fact:

In most SwiftNIO programs, you do not need to worry about synchronising your state in ChannelHandlers because you construct a fresh instance per Channel. And given that each Channel always runs on the same EventLoop (-> same thread) it's always all good.

The chat server example however is a bit different: It uses a very advanced SwiftNIO feature which allows you to share one and the same ChannelHandler instance in multiple Channels. If you're doing this, you're on your own and you need to synchronise all accesses to your state correctly. You can do this with a Lock or a DispatchQueue. But you need to make sure that you keep your critical sections (where you hold the Lock or are in queue.sync) as short as possible. And more crucially, you must not call into SwiftNIO with the lock/queue held.

Does this make sense to you? If yes and you decide that ChannelHandler instance sharing is the right approach for you, you need to be way more diligent about threading/synchronisation than you normally have to be in NIO programs.
There is also another approach which may be simpler: Instead of sharing a ChannelHandler instance, you could create a separate object (a class) that is fully thread-safe which you use to collect all channels. Then you would share an instance to your thread-safe channel collection with fresh ChannelHandlers for each Channel.

Regarding "tho when I use queue.async like NIOChatServer does in my Vapor 3 app NIO asserts" sounds like you did not get the locking/synchronisation quite right and NIO caught the error. Mind pasting the actual error?

Your above code just accesses self.channels without any locking/synchronisation. This will fail if you add this handler instance to multiple channels. To be honest, the chat server example does get synchronisation right but only just and in very subtle ways, I don't think it's a good example, I'm sorry. You will see that the chat server only accesses self.channels on self.channelsSyncQueue which makes sure that the synchronisation works. Crucially, the chat server only calls into the Channel API (which is thread-safe) from self.channelSyncQueue which is correct (but you must not call into the ChannelHandlerContext API (which isn't thread-safe)). (I filed an issue for the NIO team to fix the example).

Thanks again Johannes for your time to reply. I see now why NIOChatServer is doing what it is and why I was seeing different handler instances working from the echo server example.

Instead of sharing a ChannelHandler instance, you could create a separate object (a class ) that is fully thread-safe which you use to collect all channels. Then you would share an instance to your thread-safe channel collection with fresh ChannelHandler s for each Channel .

When you say share an instance of the thread safe class that will capture the channels on channelActive/channelRegistered, I assume you mean to construction inject a thread safe dictionary impl in the creation of the hander and reference it in the handler? E.g.

channel.pipeline.add(handler: CloudRelayHandler(channels: ActiveChannels))

And then on reads it simply performs something similar to a writeToAll( with the current set of channels in that dictionary, similar to what is in the NIOChatServer example.

Update. I got this to work by injecting an object that stores the list of channels to the handler, updating that object by adding them on channelActive() and removing on channelInactive() and then made that object thread safe and all the bridges are now crossed. Thanks again for your help @johannesweiss.

Coming back to this and encountered a new issue I'm struggling to resolve. I'm finding the object that has the ServerBootstrap in its init() method and has the following deinit:

deinit {
try? group.syncShutdownGracefully()
}

on app shutdown (via systectl on linux and terminating the app on macOS) the deinit is never called. I'm also seeing that the address and port are 'alive' for a period after process termination, likely until the OS cleans things up (at least as listed in sudo lsof -i -P

Is the port still listed as listening in lsof and the deinit never being called related? The socket objects are properties of the App struct and am still figuring out to reference them from the AppDelegate and kill them (on macOS at least, also figuring out app.shutdown() with Vapor which claims should end NIO in SIGTERM already in Vapor 4)

No, probably not. Without seeing the specifics of the output from lsof it's hard to be sure, but probably these are leftover ports in TIME_WAIT. This state is used to handle delayed packets.

The OS will drop all sockets immediately upon process termination. If you don't clean them up manually, the OS will do the effective equivalent of calling close() for you.

Thanks for the info Cory.

The emulator and the SUT I have connect to each other, and one test is for the SUT to automatically reconnect the socket after it determines its lost the connection. I brute force shut down the emulator and when the SUT gets to its retry interval it reconnects, even tho the emulator has not restarted. I'm at a loss as to how that happens.

I don't think I have a sufficiently clear understanding of the problem you're encountering, but it seems like the most likely outcome is that something else is listening on that port.

Terms of Service

Privacy Policy

Cookie Policy