Do swift-nio-ssh child channels utilize read()?

Hello all,

I had a previous post here asking about how Swift NIO can utilize backpressure, and the discussion focused around the context.read() actions.

I've made progress on my sftp project but hit a weird issue. With swift-nio-ssh, the handlers are added in a "child channel", and my channel handler's read(context:) is never called for some reason.

In this code (jlsftp/SftpServerChannelHandler.swift at 130a611f7a7acdb6f3d80d206800dbf5379fecdd · Jman012/jlsftp · GitHub), the context.fireErrorCaught is triggered, and the read(context:) is not called. After brushing up on my original post (Handling long streams of data, or handling header and body efficiently?) I found that the concept of read doesn't work the way I was intending (blocking a call to channelRead(context:) until read(context:) passes again).

So, I made some alterations and queued up the incoming messages because a few might creep in before the stop to reading the socket results in data stopping flowing through the channel handler pipeline: jlsftp/SftpServerChannelHandler.swift at 607b0feb10ee30e689d4d31f28170318883927f3 · Jman012/jlsftp · GitHub. The code is now working such that it's not processing more than one SftpMessage at a time, but the queue fills up more than it should, up to 40 instances when retrieving 32kb chunks of a ~3MB file. The read(context:) is still not being called, so the queue continues to fill up.

I haven't tested this outside of the swift-nio-ssh child channel paradigm, but my thinking is that this is not a true channel connected to a socket, so the reads aren't going through exactly? I also see this in the source: swift-nio-ssh/SSHChildChannel.swift at 5a28f0d22e2057b09243f8a7e214938698a4b8c3 · apple/swift-nio-ssh · GitHub.

Some clarification would be greatly appreciated!

Hi James,

I looked into the SSH tests, and we verify that read(context:) is called correctly.

Looking at your code it might be worth explaining the relationship between read(context:), channelRead(context:, data:) and channelReadComplete(context:).

  1. A channel, which default options have not been changed, will read up to four times from the socket at a time. This means that your channelRead(context:, data:) method might be invoked up to four times in a row, if you consume the data directly from the socket. If you have intermediary channel handlers (like NIOSSHHandler in your case) this number might increase or decrease depending on the framing of the transported messages. The main lesson from this should be: Never assume, that your channelRead(context:, data:) message is just called once, before the next read().

  2. After all channelRead(context:, data:) invocations are completed, you will eventually receive an invocation to channelReadComplete(context). If you have received this invocation, you know that SwiftNIO has now stopped reading and you will not receive further messages, as long as context.read() has not been called.

  3. If you forward the ChannelReadComplete event by calling context.fireChannelReadComplete() in your channelReadComplete(context:), the ChannelReadComplete will reach the end of the channel pipeline. There the channel auto read (another default NIO setting) will kick in and trigger a read event into the outbound direction of the pipeline. You will get an invocation to read() now. You can hold this event until you have consumed all data from the last channelRead invocations.

To change your behavior into something you would expect your code would look like this:

import NIO

// only here to make code compile. use your type
enum SftpMessage {
    case foo
    case bar
}

// only here to make code compile. use your type
protocol SftpServer {
    func handle(message: SftpMessage, on: EventLoop) -> EventLoopFuture<Void>
}

class SftpServerChannelHandler: ChannelDuplexHandler {
    typealias InboundIn = SftpMessage
    typealias InboundOut = Never
    typealias OutboundIn = Never
    typealias OutboundOut = SftpMessage

    let server: SftpServer
    
    init(server: SftpServer) {
        self.server = server
    }
     
    enum State {
        case waitingForMoreMessages
        case serverProcessesMessage(CircularBuffer<SftpMessage>, readEventCaught: Bool)
    }
    
    private var state: State = .waitingForMoreMessages
    
    func channelRead(context: ChannelHandlerContext, data: NIOAny) {
        // this might be called a number of times...
        let message = self.unwrapInboundIn(data)
        
        switch self.state {
        case .waitingForMoreMessages:
            self.state = .serverProcessesMessage(.init(), readEventCaught: false)
            self.server.handle(message: message, on: context.eventLoop).whenComplete { result in
                switch result {
                case .success:
                    self.serverDidFinishProcessing(context: context)
                case .failure(let error):
                    self.serverFailedToProcess(error)
                }
            }
            
        case .serverProcessesMessage(var buffer, let readEventCaught):
            buffer.append(message)
            self.state = .serverProcessesMessage(buffer, readEventCaught: readEventCaught)
        }
    }
    
    func channelReadComplete(context: ChannelHandlerContext) {
        // after this is called, `channelRead(context:, data:)` will not be called anymore, until
        // `context.read()` is invoked.
        context.fireChannelReadComplete()
    }

    func read(context: ChannelHandlerContext) {
        switch self.state {
        case .waitingForMoreMessages:
            // all messages are processed. We definitely want to read more...
            context.read()
            
        case .serverProcessesMessage(_, readEventCaught: true):
            preconditionFailure("This state is invalid. read will only be invoked once. We must have called, context.read() in the meantime, and forgot to change our state")
            
        case .serverProcessesMessage(let buffer, readEventCaught: false):
            // we are still processing messages. Let's mark that we need to call `context.read()`
            // once we have consumed all
            self.state = .serverProcessesMessage(buffer, readEventCaught: true)
        }
    }
    
    private func serverDidFinishProcessing(context: ChannelHandlerContext) {
        switch self.state {
        case .waitingForMoreMessages:
            preconditionFailure("If we did process data, we can't be waiting for more data already")
            
        case .serverProcessesMessage(let buffer, readEventCaught: true) where buffer.isEmpty:
            // we have received a read invocation before and we are now ready to consume more
            // messages. Now is the time to invoke `context.read()`
            self.state = .waitingForMoreMessages
            context.read()

        case .serverProcessesMessage(let buffer, readEventCaught: false) where buffer.isEmpty:
            // we have not received a `read()` invocation while we processed the request. we will
            // receive more invocations to `channelRead`, which we can process right away, or an
            // invocation to `read()`, which we will forward in this state right away.
            self.state = .waitingForMoreMessages
            
        case .serverProcessesMessage(var buffer, let readEventCaught):
            assert(!buffer.isEmpty)
            let messageToProcess = buffer.removeFirst()
            self.state = .serverProcessesMessage(buffer, readEventCaught: readEventCaught)
            
            self.server.handle(message: messageToProcess, on: context.eventLoop).whenComplete { result in
                switch result {
                case .success:
                    self.serverDidFinishProcessing(context: context)
                case .failure(let error):
                    self.serverFailedToProcess(error)
                }
            }
        }
    }
    
    private func serverFailedToProcess(_ error: Error) {
        // your error handling here!
    }
}

I hope that helps, if you have further questions, please reach out!

If you want to unit test your behavior, this thread might also be interesting to you:

Thank you for the information, @fabianfett!

This is indeed true. It was my assumption at first, but after re-reading Lukasa's post in the mentioned thread, I realized this was the case. I had some preconditions that if read(context:) would return false, then channelRead(context:data:) would not be called. I've changed some of my code to handle this better.

That code example is very in-depth! I will definitely reference this. I think that is the clearest and simplest example I have seen so far explaining how to properly stop a read and resume it (by calling context.read() again, which my code is not doing right now).

This was actually a past post of mine. I'm glad it's coming in useful when talking about these topics.

As for the actual issue, I first tried analyzing my bootstrap and handlers in the pipeline. I verified that channelReadComplete(context:) was indeed being called, I changed my serverChannelOptions to match that of the NIOSSHServer example code, and I fixed an issue where I wasn't forwarding user inbound events from one of my handlers. That still did not fix the issue. However, I took a look at my package file and realized I was on 0.0.2 of swift-nio-ssh! I updated that to the latest (and updated swift-nio from 2.30.0 to latest as well), fixed some breaking changes, and now I'm seeing that read(context:) is finally being called properly. That is entirely my fault.

Right now my server is deadlocking, but that is because I need to modify my handlers to call context.read() properly. Once I do that, then I expect everything to work properly.

Thanks for the help!

2 Likes