Race condition in TicTacFish sample code?

Folks, I think I've found a race condition in the WebSocketActorSystem+Client sample code, and could use some advice on how to fix it.

I believe the startClient() function is returning the channel before it is fully upgraded to a web socket, causing remote calls on that channel to get a fatal error if they are made too quickly after the client agent system is created. This may not be a big deal for interactive code, but it makes unit testing the agent system unreliable.

Is this a case for an NSLock, like other parts of the WebSocketAgentSystem use, or is there a way to solve the race condition using higher-level async/await?

extension WebSocketActorSystem {
    func startClient(host: String, port: Int) throws -> Channel {
        let bootstrap = PlatformBootstrap(group: group)
            // Enable SO_REUSEADDR.
                .channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
                .channelInitializer { channel in

                    let httpHandler = HTTPInitialRequestHandler(target: .init(host: host, port: port))

                    let websocketUpgrader = NIOWebSocketClientUpgrader(
                        requestKey: "OfS0wDaT5NoxF2gqm7Zj2YtetzM=",
                        upgradePipelineHandler: { (channel: Channel, _: HTTPResponseHead) in
                            channel.pipeline.addHandlers(
                                WebSocketMessageOutboundHandler(actorSystem: self),
                                WebSocketActorMessageInboundHandler(actorSystem: self)
                                // WebSocketActorReplyHandler(actorSystem: self)
                            )
                        })

                    let config: NIOHTTPClientUpgradeConfiguration = (
                        upgraders: [websocketUpgrader],
                        completionHandler: { _ in
                            channel.pipeline.removeHandler(httpHandler, promise: nil)
                        })

                    return channel.pipeline.addHTTPClientHandlers(withClientUpgrade: config).flatMap {
                        channel.pipeline.addHandler(httpHandler)
                    }
                }

        let channel = try bootstrap.connect(host: host, port: port).wait()

        return channel
    }
}

1 Like

Here's my attempted fix using a DispatchSemaphore to wait for the channel to be upgraded to a WebSocket. Feedback welcome!

extension WebSocketActorSystem {
    func startClient(host: String, port: Int) throws -> Channel {
        let upgradeSemaphore = DispatchSemaphore(value: 0)
        let bootstrap = PlatformBootstrap(group: group)
            // Enable SO_REUSEADDR.
                .channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
                .channelInitializer { channel in

                    let httpHandler = HTTPInitialRequestHandler(target: .init(host: host, port: port))

                    let websocketUpgrader = NIOWebSocketClientUpgrader(
                        requestKey: "OfS0wDaT5NoxF2gqm7Zj2YtetzM=",
                        upgradePipelineHandler: { (channel: Channel, _: HTTPResponseHead) in
                            defer {
                                upgradeSemaphore.signal()
                            }
                            return channel.pipeline.addHandlers(
                                WebSocketMessageOutboundHandler(actorSystem: self),
                                WebSocketActorMessageInboundHandler(actorSystem: self)
                                // WebSocketActorReplyHandler(actorSystem: self)
                            )
                        })

                    let config: NIOHTTPClientUpgradeConfiguration = (
                        upgraders: [websocketUpgrader],
                        completionHandler: { _ in
                            channel.pipeline.removeHandler(httpHandler, promise: nil)
                        })

                    return channel.pipeline.addHTTPClientHandlers(withClientUpgrade: config).flatMap {
                        channel.pipeline.addHandler(httpHandler)
                    }
                }

        let channel = try bootstrap.connect(host: host, port: port).wait()
        
        upgradeSemaphore.wait()

        return channel
    }
}

Link to your implementation

Sure! It's still missing some features, but it's here:

The main thing that's missing is the ability for the server to track which clients house which actors, so the server can push messages to the clients.

We have recently introduced new API with an example how to do a web socket upgrade with our new async APIs. Sadly, we had to back out the new typed upgrade APIs due to a compiler bug on iOS 15 and below. However, you should be able to just copy the code from the PR until we land it again.

Let us know how it goes!

3 Likes

Thanks for the tip to look for the pull request!

It took a bit of doing, but I managed to extract the necessary code from the PR, get it working with the latest release of NIO, and update the client initialization code to use the typed upgraders.

I decided against switching from Channel to NIOAsyncChannel because the distributed actor sample code is based on Channels, and I saw the warning for NIOAsyncChannel that says:

This type does not replace the full complexity of NIO's Channel. In particular, it
does not expose the following functionality:

  • user events
  • traditional NIO back pressure such as writability signals and the Channel/read() call

Users are encouraged to separate their ChannelHandlers into those that implement
protocol-specific logic (such as parsers and encoders) and those that implement business
logic. Protocol-specific logic should be implemented as a ChannelHandler, while business
logic should use NIOAsyncChannel to consume and produce data to the network.

The WebSocketActorSystem seems more like protocol than business logic, so I kept using Channels. That also allowed me to remove all of the @_spi(AsyncChannel) attributes so that I'm not dependent on unstable APIs.

The resulting code is no shorter or clearer than the previous version, but it fixes the race condition and does not require semaphores, so I count that as a win.

I won't repost the updated code here, but you can see it in my Github repository if you are interested.

In my opinion, the actor system is very much business logic. What the documentation means with protocol logic is actual network protocol implementations such as HTTP or Websocket. We recommend most people to bridge out via NIOAsyncChannel now and just implement their logic with Swift Concurrency.

With the latest NIO releases you don't have to use @_spi anymore since all the APIs became regular public APIs.

1 Like

In that case, I'll add migrating to async channels to my to-do list. Thanks for the advice!

1 Like

For anyone who follows down my path, be aware that there is a second PR containing the corresponding server code. If you are writing both WebSocket client and server code and want to use NIOAsyncChannel and the typed upgraders, you'll want to look at both PRs.

3 Likes

@FranzBusch Thanks for the advice to switch to NIOAsyncChannel! I was in over my head for a while refactoring the code, but my unit tests are passing again and the logic is much simpler.

1 Like