Pipeline handler ordering and other questions

I'm currently implementing an MQTT broker using SwiftNIO and I'm having trouble understanding a few things.

First and foremost, I can't seem to get a handle on the order in which pipeline handlers are called. I'm currently setting up like this:

.childChannelInitializer { channel in
    channel.pipeline.addHandler(DebugInboundEventsHandler(), name: "DebugInbound1").flatMap { _ in
        channel.pipeline.addHandler(ByteToMessageHandler(PacketDecoder()), name: "PacketDecoder")
    }.flatMap { _ in
        channel.pipeline.addHandler(DebugInboundEventsHandler(), name: "DebugInbound2")
    }.flatMap { _ in
        channel.pipeline.addHandler(DebugOutboundEventsHandler(), name: "DebugOutbound2")
    }.flatMap { _ in
        channel.pipeline.addHandler(PacketHandler(), name: "PacketHandler")
    }.flatMap { _ in
        channel.pipeline.addHandler(DebugOutboundEventsHandler(), name: "DebugOutbound1")
    }
}

My understanding from the documentation in ChannelPipeline.swift is that inbound events would be processed in order DebugInbound1, PacketDecoder, DebugInbound2, PacketHandler (as they implement ChannelInboundHandler) and outgoing events in order DebugOutbound1, DebugOutbound2 (as they implement ChannelOutboundHandler). I'm currently triggering a writeAndFlush from PacketHandler, and so expected 2 sets of logs. However, only DebugOutbound2 is called for the write. Which makes sense if all handlers are in one sequence, but the documentation seems to imply that inbound and outbound events are processed separately. Or does a write from a ChannelInboundHandler change that behavior somehow?

Second, are there any good abstractions for a read / write interface to ByteBuffer and Data? I'm creating a core parsing and packet representation library and would like to use the same logic for my client and server side consumers, just with different underlying byte representations. However, there doesn't seem to be any common interface between the two, so before I create my own I thought I'd see if something existed already.

Third, while writing my BytesToMessageDecoder for the basic packet parsing, I was somewhat confused by the DecodingState type. The .needMoreData state is understandable but I'm not sure what the .continue state is for exactly. Reading through the source, it seems like .needMoreData spins the run loop to get more data but ultimately the path is very similar to .continue, so I'm not sure what the difference is.

The ChannelHandlers are in one sequence, but not all ChannelHandlers process all events. This is what the documentation is trying (and potentially failing) to get across. This leads to the behaviour you’ve observed, where writes issuing from a ChannelHandler are only seen by ChannelOutboundHandlers that are closer to the head of the Channel than the initiating ChannelHandler. Any ChannelHandler that is not a ChannelOutbound handler will not see the write, regardless of its position in the pipeline.

Note that this only happens when triggering events using the ChannelHandlerContext. A ChannelHandlerContext is an object that essentially represents the specific location in the pipeline of a given ChannelHandler. If you ever want to trigger a write that traverses the entire Channel, regardless of which entity issued it, you can call Channel.write instead of ChannelHandlerContext.write. In general, though, it is unexpected for ChannelHandlers to do this.

Not that I am aware of: in general applications that I’m aware of that are sharing code between client and server have thus far used ByteBuffer on both sides.

.continue is a request for the ByteToMessageHandler to immediately invoke decode again.

The reason for this model is that in general single-step parsers are easier to maintain and understand than complex multi-pass parsers. They allow NIO to manage your buffering for you, and encourage you to break your parsing operations down into operations that can be completed atomically.

It also allows us to abstract away the stream nature of the data that passes through most Channels. As an example, consider a simple length-delimited protocol, where each chunk of data is preceded by a byte that indicates how long it is.

You might want to write a ByteToMessageDecoder like this:

struct LengthPrefixedDecoder: ByteToMessageDecoder {
    typealias InboundOut = ByteBuffer

    mutating func decode(context: ChannelHandlerContext, buffer: inout ByteBuffer) -> DecodingState {
        let originalReaderIndex = buffer.readerIndex

        guard let length = buffer.readInteger(as: UInt8.self), 
              let payload = buffer.readSlice(length: Int(length)) else {
            buffer.readerIndex = originalReaderIndex
            return .needMoreData
        }
        
        context.fireChannelRead(self.wrapInboundOut(payload))
        return // What should we return here?
    }
}

Obviously in the early-exit guard we want to return .needMoreData, but what should we return at the end of the successful parse?

If we return .needMoreData, we will not be called again until more data is read from the network, whereas if we return .continue we'll be immediately invoked again. The reason we want to return .continue is because there may be more data in that buffer. For example, we might have received two messages at once, and we want to make sure we parse them both.

In essence, .continue means "I have meaningfully consumed some data and made some progress, please keep spinning the loop". Note that NIO has a short-circuit here, and if there is no more data to provide you, returning .continue will not actually invoke you again.

2 Likes

Thanks, I figured it was something like that. Most of the documentation I've found so far has dealt with the pipeline as a whole, with some mention of inter-handler communication through the context in the API docs. Is there a best practices or otherwise combined documentation anywhere?

That doesn't seem possible without importing all of NIO. And even if that was easy, I don't think I'd want to expose a new fundamental type to the consumer of my iOS library. I'll see if I can create a common read / write interface and how I might be able to handle Data and ByteBuffer.

I think some of my confusion here comes from the documentation for decode(context:buffer:). It says:

/// Decode from a ByteBuffer. This method will be called till either the input
/// ByteBuffer has nothing to read left or DecodingState.needMoreData is returned.

This doesn't mention the .continue state and at the same time seems to indicate the readableBytes property of ByteBuffer may also have an impact on the behavior, perhaps even stopping execution if there are 0 readableBytes. Does the state of the buffer have any impact on behavior here? Also, the return documentation doesn't mention anything about the buffer, just the two DecodingStates:

/// - returns: DecodingState.continue if we should continue calling this method or DecodingState.needMoreData
if it should be called
// again once more data is present in the ByteBuffer.

"Continue calling this method" is very confusing, as either way it will be a new call to decode that picks up the new data, not somehow continuing executing from whatever point .continue was called. Right?

(I think this documentation is broken in Xcode due to the last line being a comment and not a doc comment.)

On a related note, for a single ByteToMessageDecoder (one created for each pipeline / connection, right?), is the same buffer reused between all subsequent decode calls, just with the readerIndex moved ahead as I consume bytes? Does this mean that I could see multiple message in the buffer, or are they kept separate?

Additionally, what is decodeLast intended to be used for? None of the examples are very clear, and its documentation suffers from the same issue as decode. It says:

/// This method is called once, when the ChannelHandlerContext goes inactive (i.e. when channelInactive is fired)

Yet the return documentation states:

/// - returns: DecodingState.continue if we should continue calling this method or DecodingState.needMoreData
if it should be called
// again when more data is present in the ByteBuffer.

That seems contradictory. Will it actually be called again? If so, does that mean in seenEOF == false I should just redirect to my decode logic?

Finally, for fun, here's my current decode implementation. Aside from the Data bridging I'm using to make things compatible with my current parsing code, any issues?

guard buffer.readableBytes >= 2 else { return .needMoreData }
do {
    let (count, remainingLength) = try Data(buffer.getBytes(at: buffer.readerIndex + 1, length:
buffer.readableBytes - 1) ?? []).parseRemainingLength()
    
    guard buffer.readableBytes >= (1 + Int(count) + Int(remainingLength)) else { return .needMoreData }
    
    guard let rawControlByte: UInt8 = buffer.readInteger(),
        let controlByte = ControlByte(byte: rawControlByte),
        let _ = buffer.readSlice(length: Int(count)),
        let rest = buffer.readSlice(length: Int(remainingLength)) else { throw ParseError.error }
    
    let fixedHeader = FixedHeader(controlByte: controlByte, remainingLength: remainingLength)
    switch fixedHeader.controlByte.type {
    case .connect:
        let connect = try Connect(packet: Data(rest.getBytes(at: 0, length: rest.readableBytes)!))
        let packet = SomePacket.connect(connect)
        context.fireChannelRead(wrapInboundOut(packet))
    default:
        // Unhandled packet, disconnect.
        print("Unhandled packet: \(buffer), disconnecting.")
        context.close(promise: nil)
    }
    
    return .continue
} catch {
    if (error as? RemainingLengthError) == RemainingLengthError.incomplete {
        return .needMoreData
    } else {
        throw error
    }
}

Sadly there is not: all the documentation we have is the Jazzy-generated API documentation and the README. I'd like to have more narrative documentation but I haven't familiarised myself with the way Jazzy handles narrative documentation currently.

Yup, that's the limitation, and I definitely understand the concern there.

In some ways you can think of ByteBuffer as conceptually like Data plus a pair of indices offsetting into it. This is not quite right (ByteBuffer handles uninitialised memory differently) but it's a good enough abstraction to get by. This means you can probably make Data fit ByteBuffer's interface by wrapping it in a structure that holds a reader and writer index into the underlying Data, and then shimming together the APIs you need.

I'd have argued this mentions the .continue state by way of omission: if the method will be called until either the ByteBuffer has nothing to read left or until .needMoreData is returned, then presumably returning .continue will lead to this method continuing to be called. That said, it could definitely be called out more clearly.

As to whether the readableBytes count affects whether the decode method will be called, yes it does. I think I mentioned this above but I'll restate: returning .continue is a signal to the decoder that you haven't checked if there are more bytes available in the buffer, but if there are you believe you might be able to make forward progress. If there are no bytes in the buffer then there is no point in calling decode, so NIO won't.

Yes, that's right. decode is not a coroutine, and so we can only ever call it again.

This ambiguity seems to be to derive from the fact that the NIO developers have a mental model of how decode is called that does not match to yours. This is useful to know because it means we can clean the documentation up to clarify. In this case, I think the clarification that is needed is that decode is called in a loop until either .needMoreData is returned or the ByteBuffer has .readableBytes == 0. That's what is meant by "continue calling this method". But if you're not 100% confident of that looping behaviour, I can definitely see how the sentence would be ambiguous.

Mu. :wink:

The values of the reader and writer index of the ByteBuffer are not part of the observable state of the ByteToMessageHandler outside of a call to decode. They are valid and respected throughout that call, and after that call their values can and do change. The only guarantee made about the buffer is that any byte that is "readable" (that is, the byte is in the section of the buffer between the readerIndex and the writerIndex) will be in the buffer the next time decode is called, and will be in the same place relative to the readerIndex.

In practice, under the hood the ByteToMessageHandler will reallocate and potentially shrink the storage buffer as it sees fit in order to avoid wasting memory, so you absolutely mustn't rely on readerIndex having the same numerical value between two calls to decode.

However:

Yes, you can see multiple messages in the buffer. This is the result of ByteToMessageHandler naturally operating on streams of data. Keeping the boundaries of the data returned from individual socket reads is a non-goal, because those boundaries are non-semantic: they just happen to reflect the state of the socket operation. The data has come from the network as a stream, and your code should think of it that way.

decodeLast is a "bring out your dead" method. It's called when we know that no further data will ever be received by this ChannelHandler. This can happen either because the network connection has been closed, or because your ByteToMessageDecoder is being removed from the pipeline.

We notify you of this in this way for the following reasons:

  1. In some protocols, reading EOF is semantic. This happens in HTTP for example: some messages are delimited by connection closure. In this case, you may want to feed EOF to your parser implementation.
  2. You may have some bytes leftover and need to decide what to do with them. The HTTPRequestDecoder is a good example here, as it has a RemoveAfterUpgradeStrategy enum that controls the things it will do with excess data in decodeLast: it can fire them in an error, fire them as bytes, or drop them. This is basically your list of options too.

It's not contradictory, it will be called again. Once more, decodeLast is called in a loop until either of these conditions are met.

No. seenEOF == false means that your handler is being removed. It's useful only as a flag for the cases where EOF is semantic for your protocol: if it isn't, you can safely ignore it.

Here are some notes.

  1. You're using a Data(ByteBuffer.getByes()) construction quite a bit. Are you intentionally copying the bytes out? You can avoid the copy by importing NIOFoundationCompat and using ByteBuffer.getData() instead.
  2. The first line (that wants to call Data.parseRemainingLength) is pretty inefficient: it creates a whole new Data object containing all the bytes from the ByteBuffer. Is it possible to implement parseRemainingLength on top of Collection? If it is, you can use ByteBuffer.readableBytesView to get a Collection without copying.
  3. Your guard has a line that does let _ = buffer.readSlice(length: Int(count)). I assume you're doing this to allow you to write the guard let, but if you ever need only the side effect of the read method you can just call moveReaderIndex(forwardBy:).
  4. Instead of using rest.getBytes() (or rest.getData() if you migrate to that), why not use read.readBytes()? It requires you do guard var instead of guard let, but it'll avoid the need to specify where you're reading from. In general read is better than get when you can safely use it (that is, when you're consuming bytes).

Otherwise this looks very good! I've also opened an issue to improve the ByteToMessageDecoder docs.

Thanks @lukasa! Between your response and the documentation update, it's much clearer now. Once I get the base server working I hope to unify my parsing logic, as ByteToMessageDecoder works very similarly to how Network.framework lets you handle parsing.

Eventually I'd like to remove any Foundation dependency I have in my NIO-based code, if only to make cross platform deployment easier. However, the basic packet models I've already written use Data extensively, since I developed the initial version for Network.framework usage. I've rewritten the packet parsing using ByteBuffer, so hopefully I can find a way to abstract that logic to treat Data in a similar way.

I've updated my parsing logic using a few ByteBuffer extensions and throwing logic rather than optionals. I think it's much more efficient and far simpler now that I've moved the type parsing into initializers:

guard buffer.readableBytes >= 2 else { return .needMoreData }
do {
    let (count, remainingLength) = try buffer.getRemainingLength(at: buffer.readerIndex + 1)
    guard buffer.readableBytes >= (1 + Int(count) + remainingLength) else { return .needMoreData }
    
    let fixedHeader = try FixedHeader(buffer: &buffer)
    let packet = try Packet(fixedHeader: fixedHeader, buffer: &buffer)
    context.fireChannelRead(wrapInboundOut(packet))
    
    return .continue
} catch {
    if (error as? RemainingLengthError) == RemainingLengthError.incomplete {
        return .needMoreData
    } else {
        throw error
    }
}

Where my extensions are:

extension ByteBuffer {
    enum Error: Swift.Error {
        case readFailed(fromBuffer: String, ofType: String)
    }
    
    mutating func readEncodedString() throws -> String {
        guard let length: UInt16 = readInteger() else {
            throw Error.readFailed(fromBuffer: description, ofType: "UInt16")
        }
        
        guard let string = readString(length: Int(length)) else {
            throw Error.readFailed(fromBuffer: description, ofType: "String of length \(length)")
        }
        
        return string
    }
    
    mutating func readInteger<T: FixedWidthInteger>(endianness: Endianness = .big, as: T.Type = T.self) throws -> T {
        guard let integer = readInteger(endianness: endianness, as: `as`) else {
            throw Error.readFailed(fromBuffer: description, ofType: "\(T.self)")
        }
        
        return integer
    }
    
    func getRemainingLength(at newReaderIndex: Int) throws -> (count: UInt8, length: Int) {
        var multiplier: UInt32 = 1
        var value: Int = 0
        var byte: UInt8 = 0
        var currentIndex = newReaderIndex
        repeat {
            guard currentIndex != (readableBytes + 1) else { throw RemainingLengthError.incomplete }
            
            guard multiplier <= (128 * 128 * 128) else { throw RemainingLengthError.malformed }
            
            guard let nextByte: UInt8 = getInteger(at: currentIndex) else { throw RemainingLengthError.incomplete }
            
            byte = nextByte
            
            value += Int(UInt32(byte & 127) * multiplier)
            multiplier *= 128
            currentIndex += 1
        } while ((byte & 128) != 0)// && !isEmpty
        
        return (count: UInt8(currentIndex - newReaderIndex), length: value)
    }
    
    mutating func readRemainingLength() throws -> Int {
        let (count, length) = try getRemainingLength(at: readerIndex)
        moveReaderIndex(forwardBy: Int(count))
        
        return length
    }
}

And for fun, the initializer for FixedHeader, since it's very similar to how the rest of my parsers work:

extension FixedHeader {
    init(buffer: inout ByteBuffer) throws {
        let controlByte = try ControlByte(try buffer.readInteger())
        let remainingLength = try buffer.readRemainingLength()
        
        self = .init(controlByte: controlByte, remainingLength: UInt32(remainingLength))
    }
}

This is pretty much unrelated to this thread really but just wanted to point out an issue in the above code. The general problem is that it's possible that you advance your parsing state and subsequently the code fails the parsing without resetting the parsing state. That might be okay if you never ever recover but often in network programming you get a partial message first (which yields a failing parse) and later (after getting more bytes) you try to parse again.
For example, imagine you have 3 bytes coming in: 0x00 0x03 0xff. readInteger will return okay and give you the length 3 bytes. After that however, we only have one byte left 0xff so readString will fail. In that case, we have read off the first two bytes and then we failed which isn't ideal because usually you'd expect that a method that fails leaves everything exactly as it was before the call.

Especially for ByteBuffer extensions that can be important because a user might try to invoke them on partial input (ie. further bytes might arrive later).

A pattern that works well (because ByteBuffer is a value type) is :slight_smile:

     mutating func readEncodedString() throws -> String {
        let saveSelf = self // store the indices so we can reset the ByteBuffer
        guard let length: UInt16 = readInteger() else {
            self = saveSelf // reset to original state
            throw Error.readFailed(fromBuffer: description, ofType: "UInt16")
        }
        
        guard let string = readString(length: Int(length)) else {
            self = saveSelf // reset to original state
            throw Error.readFailed(fromBuffer: description, ofType: "String of length \(length)")
        }
        
        // all good, the parsing worked.
        return string
    }

Now, there's not much that can go wrong because your extension will always reset the ByteBuffer to its original state in case something goes wrong. Also multiple of those parsing methods compose well under the premise that they either succeed or reset the state to exactly as they found it on failure.

I want to also add that since very recently (NIO 2.2.0), NIO also comes with a tool that helps finding these sorts of problems: ByteToMessageDecoderVerifier (in the NIOTestUtils) module will run ByteToMessageDecoders in various different modes, such as:

  • drip feeding bytes one by one (this would've found the above issue)
  • feeding lots of messages in one go
  • feeding multiple messages in different permutations

You can find an example use here.

I thought I had avoided this issue by basing all of my extensions on ByteBuffer's existing read* methods, which use the get* methods underneath and only move the readerIndex forward if that succeeds. Did I misunderstand how these work? I can see an issue for my extensions if they partially fail, as there could be a partially successful read, so if that's what you meant, thanks, I'll add it.

Nice. I was wondering how I was supposed to test this stuff.

This is exactly what @johannesweiss is getting at: if the first read succeeds but the second fails, you've partially consumed bytes and need to roll the reader index back.

Yep, I think I was just confused by the rollback happening both after a failed readInteger() and a failed readString(), when only the one after readString() seems necessary, no?

Note that the "partial reads" also occur between method calls. You have these two calls in a row:

    let fixedHeader = try FixedHeader(buffer: &buffer)
    let packet = try Packet(fixedHeader: fixedHeader, buffer: &buffer)

Each of these is composed of multiple reads, any of which may fail, but those reads are conceptually sequential as well. This means even the first read in each of these methods should correctly reset the reader index, but worse still the composite of these methods should also reset the reader index.

Happily, the same trick works here.

Luckily I don't need to do that very often, but perhaps it would be beneficial for me to come up with a pattern for attempting many reads and then resetting if they fail. Right now I think I only need to handle it in my ByteToMessageDecoder, as that would reset the buffer after a failed read at any depth, so this should handle it, no?

guard buffer.readableBytes >= 2 else { return .needMoreData }
let originalBuffer = buffer
do {
    let (count, remainingLength) = try buffer.getRemainingLength(at: buffer.readerIndex + 1)
    guard buffer.readableBytes >= (1 + Int(count) + remainingLength) else { return .needMoreData }
    
    let fixedHeader = try FixedHeader(buffer: &buffer)
    let packet = try Packet(fixedHeader: fixedHeader, buffer: &buffer)
    context.fireChannelRead(wrapInboundOut(packet))
    
    return .continue
} catch {
    if (error as? RemainingLengthError) == RemainingLengthError.incomplete {
        return .needMoreData
    } else {
        buffer = originalBuffer
        throw error
    }
}

This should cover it, and in general is the correct high-level pattern when reading from ByteBuffers. If you're comfortable you and your users can consistently apply that pattern, then it's the right one.

I won't go much further down this line, but would a batch API be helpful here? Something like:

mutating func batchRead<T>(using closure: (_ buffer: inout ByteBuffer) throws -> T) rethrows -> T {
    let saved = self
    do {
        return try closure(&self)
    } catch {
        self = saved
        throw error
    }
}

It's a bit awkward to use, and using it within a ByteBuffer extension like my readEncodedString() leads to an overlapping access error, but could this be made to work?

Thanks again for this! Once I figured out how to get SPM to let me run test, it found a bug immediately.