We're working on a NATS client using SwiftNIO and finding the write performance tuning quite challenging. We would like to be able to write()
to the channel multiple times then flush()
a lot less often but without explicitly exposing flush on the client API we're not seeing a good option. I looked at mqtt-nio implementation they seem to be using writeAndFlush()
which we're trying to avoid. If you can suggest any ideas, examples we can have a look that'd be much appreciated. Thanks.
Can you elaborate a little on your problem? What kind of pattern do you want your users to be able to write, and what series of NIO calls do you want to translate it into?
Thanks for your reply. Users would be publishing messages and that is in turn serialized and passed to channel.write(). of course there needs to be a flush at some point but we don't want to expose that to the user api. In our benchmarks if every publish translates into a channel.writeAndFlush() we are seeing very low message rates. We have been thinking of couple of ways to improve this:
1 - Use multiple write() calls then flush() less often to take advantage of vector writes. But we're not able to find a reliable way of calling flush() in this case. and unfortunately we can't employ a timeout based strategy since all publish() calls are time sensitive.
2 - Create a batch buffer to collect all published messages while write() and/or flush() is busy doing I/O then write the batch as they become available. Again we don't want to introduce an additional call or timeout based buffer here.
We were hoping there must be a way to do this and we can't be the first ones to encounter this issue in the SwiftNIO community
I'm also getting my head around the basics as well like EventLoop, ByteBuffer, Futures and Promises so apologies if I'm missing something obvious.
You are right that this is a common thing to come up in NIO or really any networking code. You are also correctly outlining the two strategies that can be used here either a timeout based flushing or coalescing via some other event that happens regularly. Since you ruled out timeout based flushing let's talk about coalescing flushes.
A NIO channel has a read and a write side. When data is read from the socket the Channel
fires N``channelFireRead
down the channel pipeline followed by a single channelFireReadComplete
. Most often new writes are generated when a ChannelHandler
receives a channelRead
. An optimisation that I have written a few times is to only flush
inside channelReadComplete
which will result in coalescing all writes in a given read cycle. Now this only reliably works if your writes a produces synchronously as a result of a read. If you are producing asynchronously you might still coalesce some writes but is way less common and depends on the exact behaviour of your networking protocol + the application logic.
protocol is fully async so there are a lot of use cases you won't see any reads but a ton of writes through publish() calls. protocol is very simple: Client Protocol - NATS Docs
Example scenario:
S: INFO {"version":2,10....}\r\n
C: CONNECT {"versbose":false,.....}\r\n
C: PING\r\n
S: PONG\r\n
C: PUB x 1\r\nx\r\n
C: PUB x 1\r\nx\r\n
C: PUB x 1\r\nx\r\n
....there can be thousands of PUBs sent to server sizes typically ~1-10Kb...
(ping-pong every 30 seconds)
Instead of write + flush we have two other API patterns that are in use today in different networking libraries.
- SwiftNIOs new Swift Concurrency APIs uses
NIOAsyncWriter.yield(conentsOf:)
to write aSequence
ofElement
s to theChannel
which automatically flushes at the end of theSequence
. - Network.framework uses
NWConnection.batch(_:)
to coalesce writes during the execution of the provided closure.
Thank you for all the replies. I have made some progress but I'm not able to reach the level of performance compared to other NATS clients.
The idea is to batch the messages as much as a preset limit and automatically flush them when the write and flush call is idle, giving batching a chance assuming writeAndFlush() call would take longer than copying memory around for batching.
The problem might be that although write() call is async the flush() call seems to be sync, which means it's blocking or it's a fire-and-forget call?
The BatchBuffer
implementation improves performance about x10 but this is still another magnetide less that the Go clients.
(Sorry for the giant copy-paste. there is also a repo if you want to try it yourself: GitHub - mtmk/swift-nats-experimental)
class BatchBuffer {
private let batchSize: Int
private var buffer: ByteBuffer
private let channel: Channel
private let lock = NIOLock()
private var waitingPromises: [EventLoopPromise<Void>] = []
private var isWriteInProgress: Bool = false
init(channel: Channel, batchSize: Int = 16*1024) {
self.batchSize = batchSize
self.buffer = channel.allocator.buffer(capacity: batchSize)
self.channel = channel
}
func write<Bytes: Sequence>(_ data: Bytes) async throws where Bytes.Element == UInt8 {
// Batch writes and if we have more than the batch size
// already in the buffer await until buffer is flushed
// to handle any back pressure
try await withCheckedThrowingContinuation { continuation in
self.lock.withLock {
guard self.buffer.readableBytes < self.batchSize else {
let promise = self.channel.eventLoop.makePromise(of: Void.self)
promise.futureResult.whenComplete { result in
switch result {
case .success:
_ = self.lock.withLock {
self.buffer.writeBytes(data)
}
self.flushWhenIdle()
continuation.resume()
case .failure(let error):
continuation.resume(throwing: error)
}
}
waitingPromises.append(promise)
return
}
self.buffer.writeBytes(data)
continuation.resume()
}
flushWhenIdle()
}
}
func clear() {
lock.withLock {
self.buffer.clear()
}
}
private func flushWhenIdle() {
channel.eventLoop.execute {
// We have to use lock/unlock calls rather than the withLock
// since we need writeBuffer reference
self.lock.lock()
// The idea is to keep writing to the buffer while a writeAndFlush() is
// in progress, so we can batch as many messages as possible.
guard !self.isWriteInProgress else {
self.lock.unlock()
return
}
// We need a separate write buffer so we can free the message buffer for more
// messages to be collected.
guard let writeBuffer = self.getWriteBuffer() else {
self.lock.unlock()
return
}
self.isWriteInProgress = true
self.lock.unlock()
let writePromise = self.channel.eventLoop.makePromise(of: Void.self)
writePromise.futureResult.whenComplete { result in
self.lock.withLock {
self.isWriteInProgress = false
switch result {
case .success:
self.waitingPromises.forEach { $0.succeed(()) }
self.waitingPromises.removeAll()
case .failure(let error):
self.waitingPromises.forEach { $0.fail(error) }
self.waitingPromises.removeAll()
}
// Check if there are any pending flushes
if self.buffer.readableBytes > 0 {
self.flushWhenIdle()
}
}
}
self.channel.writeAndFlush(writeBuffer, promise: writePromise)
}
}
private func getWriteBuffer() -> ByteBuffer? {
guard buffer.readableBytes > 0 else {
return nil
}
var writeBuffer = channel.allocator.buffer(capacity: buffer.readableBytes)
writeBuffer.writeBytes(buffer.readableBytesView)
buffer.clear()
return writeBuffer
}
}
It's fire and forget.
So after a quick glance at the code I'm going to make a few suggestions.
Firstly, avoid using Data
at your send
entry point. This ends up forcing you to perform an extra data copy whenever you perform a write, e.g. of a PING
message, because you're transforming from String
to Data
to ByteBuffer
, and each transformation involves a byte copy. Instead, it's better to handle ByteBuffer
directly here if you can.
Relatedly, consider avoiding this generic entry point and instead again take a ByteBuffer
. The further up the stack you can start using ByteBuffer
the better your performance will be, as you can avoid incurring extra copies. In this case this won't be a huge performance impact as you're ultimately going to hit a generic specialization, but it's still nicer to avoid the risk.
Next, consider whether having your write
method be async
is actually serving you. In many cases this will return immediately having written into the buffer, and will only occasionally actually suspend for I/O. This makes it a bit unclear what it means for the write to suspend. In general I'd say you should either always suspend, or never suspend: it's rarely wise to suspend depending on an internal state like this one.
With that we can try to work through the issue with the buffer itself. Right now I think the real concern I have is that this buffer is quite complex, and so quite aside from everything else it's challenging to understand exactly how it works. My initial recommended refactor would be to move the buffering logic entirely into a NIO ChannelHandler
. This avoids the need for the extra lock, instead relying on the Channel
's own mutual exclusion and greatly simplifying the logic.
My next suggestion would be to avoid waiting for the write promises to complete, and instead to start using NIO's ChannelWritabilityChanged
notifications. These rely on NIO's internal backpressure management system, and will give you a strong clue that you can back off. This covers most of what this buffer does, which only meaningfully reduces flush calls in a limited number of cases. This will also minimise the number of promises in flight, which is very helpful.
Third, and perhaps more importantly, we need to consider whether you can get a hint from the user as to whether you can coalesce. This will lead to the most effective operation. In this case, something like a scoped writer will be useful:
client.withBatch { batch in
batch.write()
batch.write()
batch.write()
}
Clients that are emitting a large number of messages can use this interface to give you a "hint" that they are going to send more. This avoids a need for an explicit flush
message, but allows performance-sensitive clients to tune their I/O.
A final question: do you happen to be at KubeCon 24? If you are, I'm there this week and would be happy to work through this with you in some more detail.
Thanks for the detailed response. Really appreciated!
I will need to go through it but my quick takeaway is that using ChannelWritabilityChanged
is probably best way to handle this.
I'm not at KubeCon 24 but Tomasz and Piotr should be (who are the main Swfit client developers) around NATS.io stand. It'd be awesome if you all can meet.
PS btw, never mind the experimental repo above (although BatchBuffer
type still pretty much exactly the same), the first release of the client is just out GitHub - nats-io/nats.swift: Swift client for NATS, the cloud native messaging system.