Scaling outbound writes

I may be getting ahead of myself here, but I'm looking into how I implement many simultaneous channel writes efficiently, as I didn't see anything obvious for apply the same value to many channels simultaneously.

I have two core issues:

  1. I'm using a MessageToByteEncoder to encode my models into ByteBuffers to go over the network. However, there are scenarios where MQTT brokers are expected to write the same values to many (thousands, some brokers even support millions) simultaneous connections. The obvious solution is to encode the model into a ByteBuffer manually and use that one buffer to feed all of the channels. However, there doesn't seem to be an obvious way to bypass the OutboundOut type I use and send a ByteBuffer directly, as there's (intentionally, AFAICT) no way to manually wrap it in a NIOAny. I'd rather not have to drop my nice models entirely just for this one use case.

  2. My main issue is the need to send such a message to many (again, thousands to millions) channels simultaneously. I've used the ChatServer example for my initial implementation (though I used the DispatchQueue strictly as a lock), which iterates through all of the known channels, writing to each in turn. Since channels are thread-safe, I could turn this into a Dispatch.apply, but I was wondering if there was anything built in to achieve better parallelism here, or any approach I might be missing. AFAICT, there are no APIs on Channel or EventLoop` to achieve what I want.

  3. In general, what approaches are most common or effective in dealing with the work done within a channel without blocking it? The ChatServer example uses a DispatchQueue for both locking and async work, but I don't think that's a scalable solution, especially when there may need to be more threads than CPU cores in that case. Is putting work onto a channel's EventLoop the expected method here? It appears to be a simple, Dispatch like API for submitting work, so it seems like it would be effective. But is it recommended?

Thanks for any insight.

Sure there is, NIOAny has a generic public init.

So we don't have a primitive for this, though we could. The performance of this is pretty ok except that each Channel.write will need to do a separate dispatch onto the event loop. This enqueues a new closure, does some locking and unlocking, and some FD writes and reads: all in all, it's a lot of work. If you have thousands to millions of Channels this will be slow.

The natural primitive to use here is to change the way you store your Channels: specifically, to take advantage of the fact that they share EventLoops. You could do something like this:

import NIO
import NIOConcurrencyHelpers

class ChannelStore {
    private var channels: [ObjectIdentifier: [Channel]] = [:]
    private var lock: Lock = Lock()

    func addChannel(_ channel: Channel) {
        self.lock.withLockVoid {
            let eventLoopID = ObjectIdentifier(channel.eventLoop)

            // Weird construct but it lets us use the modify accessor to avoid some 🐄.
            if self.channels[eventLoopID]?.append(channel) == nil {
                self.channels[eventLoopID] = [channel]
            }
        }
    }

    func writeToAllChannels(_ data: ByteBuffer) {
        self.lock.withLockVoid {
            for channels in self.channels.values {
                guard let eventLoop = channels.first?.eventLoop else {
                    continue
                }

                eventLoop.execute {
                    for channel in channels {
                        channel.writeAndFlush(data)
                    }
                }
            }
        }
    }
}

We could try to wrap this primitive up for re-use, but for now this model will greatly reduce the cost of these operations because it will reduce the thread-hopping you have to do. One other note: you should consider whether this work should be batched to allow time for the event loop to issue reads (i.e. maybe you should group the writes into batches of no more than 100).

We have one: eventLoop.submit, eventLoop.execute, eventLoop.scheduleTask, and so on. However, in general you should only submit short or non-blocking tasks to event loops. The more work you do, the more you will block the I/O portion of the loop. If your work fits this model, dispatching it to an event loop is just fine.

Yes, but every attempted use of it results in a runtime crash, due to not knowing how to properly unwrap the value. This includes manually creating the NIOAny as well as using the generic channel.writeAndFlush(buffer). Here's my very basic implementation:

func handlePublish(_ publish: Publish, in context: ChannelHandlerContext) {
    print("🐉 Received .publish for topic: \(publish.message.topic).")
    let subscribers = protector.sync { self.subscriptions[publish.message.topic, default: []] }
    
    guard !subscribers.isEmpty else { return }
    
    let channels = protector.sync { subscribers.compactMap { self.clientIDsToChannels[$0] } }
    
    print("🐉 Writing publish to topic: \(publish.message.topic) to all subscribing channels.")
    let data = Packet.publish(publish).encodable.packet.encoded
    var buffer = context.channel.allocator.buffer(capacity: data.count)
    buffer.writeBytes(data)
    channels.forEach { $0.writeAndFlush(NIOAny(buffer)).whenComplete { _ in
            print("🐉 Wrote publish to topic: \(publish.message.topic).")
        }
    }
}

Of course, writing the packet using wrapOutboundOut works fine, I'm just looking to avoid the separate encoding step for every channel.

I'll look into your channel handling and batching recommendations and see how they work out.

Ah, looks like it's due to attempting to unwrap it as a Packet, which is my OutboundOut type, and the type that my MessageToByteEncoder expects. So, perhaps I need to bypass the encoder for it to send correctly?

Edit: In order to do that I would need to use the context, which I can't for the other channels. Hmm...

But you seem to be writing ByteBuffers into the pipeline, right? That suggests that the data is already encoded but then I don’t quite understand what you’re doing with the MessageToByteEncoder.

I think the crash you’re seeing is MessageToByteEncoder expecting a Payload but receiving a ByteBuffer. Are you trying to add extra framing with your MessageToByteEncoder maybe?

Btw, you don’t need the NIOAny(buffer), you can just do channel.writeAndFlush(buffer).

Right. My ChannelInboundHandler normally operates on my Packet type, and I have the proper Decoder / Encoder pair to convert them to and from ByteBuffer. However, in order to not duplicate the encoding when I send many outgoing publish packets (which are all the same in some scenarios), I'd like to short circuit that and encode a buffer to be sent directly. Unfortunately that conflicts with my MessageToBytesEncoder, which expects a Packet to come in. So is there a better way to do what I want or a way to bypass the encoder for certain writes?

Ok, but that's the inbound side right? Ie. stuff received from the network.

Ok, I still can't follow 100% but I think there are only two options:

Option 1: You want to sometimes send Packet through the pipeline (outbound) and sometimes you want to send a pre-encoded ByteBuffer. The spelling for that is:

enum PossiblyPreEncodedPacket {
    case packet(Packet)
    case preEncoded(ByteBuffer)
}

and your OutboundOut type should then be PossiblyPreEncodedPacket. Ie.

struct PossiblyPreEncodedPacketEncoder: MessageToByteEncoder {
    typealias OutboundIn = PossiblyPreEncodedPacket

    func encode(data: PossiblyPreEncodedPacket, out: inout ByteBuffer) {
        switch data {
            case .preEncoded(var buffer):
                out.writeBuffer(&buffer) // see note at the end
            case .packet(let packet)
                /* encode packet into `out` */
        }
    }
}

Oh, and writing those pre-encoded messages into a Channel is just channel.writeAndFlush(PossiblyPreEncodedPacket.preEncoded(buffer)).

Option 2: You always want to send ByteBuffers because you always pre-encode them in which case you don't need the MessageToByteEncoder at all because you pre-encoded it before even sending it.

Does that make sense?

You shouldn't try to bypass any handlers, if you need a 'this can be type Foo or Bar', then the right answer is enum FooOrBar { case foo(Foo); case bar(Bar) } like the PossiblyPreEncodedPacket example above.


Note: Feel free to ignore, let's deal with this once we resolved the more important issues about the types. Using MessageToByteEncoder as I did (in the example for 'option 1') would work for you but it's not the most efficient way of spelling what you need, but easy to resolve.

Right, I'm talking about my PacketHandler, which is a ChannelInboundHandler with InboundIn = Packet, and OutboundOut = Packet.

I think you're right, for my outbound type I I can use the enum and it should work fine.

Got it, yes, the enum would be exactly the right choice because (for efficiency reasons) you have different possible types you'd like to write.

And as mentioned in the above note: Using a MessageToByteEncoder for this task isn't optimal: It works perfectly well if you always want to encode a Packet but given that you sometimes want to just hand through the ByteBuffer, MessageToByteEncoder isn't perfect because it would force you to copy the ByteBuffer you already have into another one. So you might be better off writing a ChannelOutboundHandler instead, it's not that much harder (here's an example).

Actually, thinking about it, there is actually a pretty interesting and efficient way of using a MessageToByteEncoder:

struct PossiblyPreEncodedPacketEncoder: MessageToByteEncoder {
    typealias OutboundIn = PossiblyPreEncodedPacket

    func encode(data: PossiblyPreEncodedPacket, out: inout ByteBuffer) {
        switch data {
            case .preEncoded(var buffer):
                out = buffer // replace the buffer M2BE offers us ;)
            case .packet(let packet)
                /* encode packet into `out` */
        }
    }
}

With a straight ChannelOutboundHandler you'll still be able to achieve slightly better performance because in the pre-encoded case, we're just replacing the ByteBuffer that MessageToByteHandler gives us with our own. But MessageToByteHandler did some work to prepare the ByteBuffer, for us to then just replace it and throw it away :slight_smile:

Yeah, it works pretty well.

I'll look into scaling the channels next.

I'm also still interested in how people write handlers that can do more work than desired while on an EventLoop while still balancing the loops threads and external work threads.

Awesome. See my latest edits, if you replace case var .preencoded(buffer): out.writeBuffer(&buffer) with case var .preencoded(buffer): out = buffer it should actually be faster :slight_smile:. It's a bit weird but it'll be faster.

Cool, if you're after raw performance do avoid DispatchQueues as locks in Swift, you'd pay 10x as much as you need.

If what you're doing is compute intensive, then possibly one single threaded EventLoop (MultiThreadedEventLoopGroup(numberOfThreads: 1)) along-side a fixed sized thread pool is probably your best bet. Why? Because presumably you're doing very little IO compared to the actual computations. That can save you (in certain cases) from doing any locking on the EventLoop because you're left with just one thread.

If your workload is more mixed, then you'll need to benchmark which exact bit is the bottle neck and scale that accordingly. Please feel free to reach out once you know where your bottleneck(s) are/is.

Yep!

Oh yeah. Since I'm not using the async behavior, using Lock may be a quick transition.

Sure thing. I expect my bottlenecks to be around subscribe, unsubscribe, and publish, given I need to look up topics by wildcard (eventually), so mainly dictionary lookup and some string parsing, perhaps some last will storage and behavior. Other than that most of the work is the packet processing. I'll see what the benchmarks say.

1 Like

I found a quick benchmark which stresses publishing to topics. This really stressed my incoming packet parsing. Unexpectedly, fully 15% of my overall time (not just parsing, all time) was spent in this extension on Optional:

    func get(producingErrorDescription description: String = "Unexpectedly found nil instead of \(Wrapped.self).") throws -> Wrapped {
        switch self {
        case let .some(value): return value
        case .none: throw Error.unexpectedNil(description: description)
        }
    }

Looking into it, it was due to the interpolation of Wrapped.self in the default argument. Removing the interpolated value and marking the function inline removed it from the call stack and improved performance quite a bit.

After that change I'm outperforming mosquitto for that simple benchmark.

2 Likes