How do I send out asynchronous work in a channel pipeline?

So, if a ChannelHandlerContext is not thread-safe (and raises an error if you call a method on it from outside its EventLoop), and methods on a Channel are not supposed to block (so you're supposed to send asynchronous tasks out to be done somewhere else), how are we supposed to send the output of such an asynchronous task back to the other end of the pipeline?

e.g.:

.childChannelInitializer { channel in
                channel.pipeline.addHandlers([
                    BackPressureHandler(),
                    POPSessionHandler(), // create/own the session state
                    POPParseHandler(), // transform incoming string into POP command + args
                    POPActionHandler() // <-- do some async work, fetching mail, building response, &c.
                ])

Let's say that POPActionHandler looks like this:

final class POPActionHandler: ChannelInboundHandler {
    typealias InboundIn = POPSessionState
    typealias InboundOut = ByteBuffer

    public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
        let sessionState = unwrapInboundIn(data)

        // can't actually do this, since we aren't in an async context now
        let responseString = await processStateAction(sessionState)

        // but ultimately, we want to send the response along
        var outBuffer = channel.allocator.buffer(capacity: response.count)
        outBuffer.writeString(response)
        context.writeAndFlush(wrapInboundOut(outBuffer), promise: nil)
    }

    private func processStateAction(_ state: POPSessionState) async -> String { ... }
}

Am I supposed to grab the EventLoop out of the context and somehow add a Task to it? Or do I wrap the final bit with the buffer and the context in something and send that to the loop, or what?

Aha! Yes, it's as simple as that:

    public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
        let sessionState = unwrapInboundIn(data)

        Task {
            let response = await processStateAction(sessionState)

            context.eventLoop.execute {
                var outBuffer = channel.allocator.buffer(capacity: response.count)
                outBuffer.writeString(response)
                context.writeAndFlush(self.wrapInboundOut(outBuffer), promise: nil)
                if sessionState.popState == .quit {
                    _ = context.close()
                }
            }
        }
    }

1 Like

Yup, this looks right to me!