Handling long streams of data, or handling header and body efficiently?

I'm thinking about how implementing a protocol in SwiftNIO would work, where there are a variety of packet types, but one or more may contain a large payload. For myself, this is implementing SFTP; however, HTTP is a good analog as well. An example: a client connects to the server, and sends an HTTP PUT request with a 1 GB body as the file contents.

An HTTP request parser would not want to use a ByteToMessageDecoder that loads the entire body into a ByteBuffer. That would take up a lot of server memory, as well as delay the processing of the HTTP request until after the entire gigabyte has been uploaded. (What if it's unauthorized, and we wasted time before sending back an HTTP 4** or killing the connection before wasting bandwidth and resources?).

I've looked into SwiftNIO's HTTP implementation for insight, but it seems to still be a step below what I'm thinking of. It seems to have a ByteToMessageDecoder object that converts the incoming stream of bytes into a single .header, zero or more .body (depending on the type of HTTP request), and a single .end enumeration case, where the .body cases are small chunks of data that were read off of the socket. This is as opposed to a single body.

Next, I looked at Vapor's implementation on top of SwiftNIO's NIOHTTP1/2. See:

Vapor does what I'm thinking of, at least partially. It uses NIOHTTP and the header, 0 or more bodies, and end (and keeps track of what it should be waiting for via an internal state on the handler, which makes sense). When it encounters a body chunk, it seems to write it to a stream setup on the header object passed on to be processed. The implementation of the BodyStream, however, stores a contiguous array of BodyStreamResults, which are similar to NIOHTTP's .body enum cases which hold a chunk of data from the socket connection. Once something attempts to read the body stream, any future incoming data immediately calls the handler instead of being stored in memory.

My initial apprehension of this is that memory may quickly be allocated to store a large amount of incoming data dynamically. If there is some kind of delay in beginning to read that incoming stream, or if writing that data to disk or DB is slower than it is coming in on the connection, wouldn't that take up too much RAM? Could this be a potential vector of Denial of Service?

(To be honest, I've been musing on this for a while now and just now thought that, realistically, the time delay between storing incoming chunks of data and the application code doing something with the body stream is likely short, and memory wouldn't be consumed too badly, at least in an HTTP server scenario. In another scenario, where perhaps it waits on user input before reading the body. Example: Chrome or Safari asking you if you want to allow a website to download a file to your storage. It received the HTTP header, but only starts reading the body stream after user input.)

There are two solutions, I think, to this problem:

  1. Stream the incoming data into a temp file in storage if the memory consumption begins to get too big
  2. Only read data off the socket when it's requested

First, for "Stream the incoming data into a temp file in storage if the memory consumption begins to get too big", I think this one can be somewhat straightforward. One could feasibly take something like Vapor's Request.BodyStream and add extra code to monitor its memory consumption, and if it gets too large, make a new temp file and dump all of the memory-stored data into it. Then, a future read operation on the stream would drain data out of the file (and then directly from the socket after that).

Second, for "Only read data off the socket when it's requested", this is where it gets tricky, I think.

The way I understand it, SwiftNIO will fire on the channel pipeline whenever and as soon as any data is available in the socket's read buffer. Once this is done, the code can read the given ByteBuffer or it can choose to do nothing with it. But that's the only chance it has to do so. If the code does nothing with the data, then that data is lost and the next channel pipeline invocation will use new data in the ByteBuffer. This is as opposed to ByteToMessageDecoder/Handler, which has its own ByteBuffer that grows from the incoming data off the socket until enough data meets the Decoder's needs, which then reads the buffer and the buffer can be re-used. I think this is correct.

But because of this, if the channel pipeline + ByteToMessageDecoder produced a header message, and let's say it called some asynchronous code to analyze and process it, that might be done on a Promise/Future, which means a new invocation on the EventLoop/Group on the thread. There's nothing to stop the EventLoop/Group from still invoking the channel pipeline with more data to read and process. I did a lot of research on run loops and event loops and I think this is correct. It'll almost be as if the incoming data is always coming in from the connection, in our example of a large file upload. If the header processor chose to wait 5 seconds before processing everything and the body, then that's 5 seconds worth of data still being read in by SwiftNIO and being buffered somewhere.

My question, then, would be if it's possible to temporarily not wait on the channel's socket file descriptor, which the EventLoop/Group internally uses in its epoll/kqueue system call. In this way, the kernel would fill up the descriptor's/process' data buffer, and TCP buffers would become full, which tells the client to temporarily stop sending new TCP segments, all until the server begins reading the data. This way, no buffer is needed, DoS prevention is a bit better, we don't have to worry about temp files and writing to storage or memory, etc.

The header handler would have to begin reading the body at some point, of course, either because the header was accepted and it wants to stream the data directly to its proper place in storage or in the database (using a much smaller buffer in the stream for this), or because the header was not accepted and the connection can be killed entirely, or maybe it just needs to drain the body into /dev/null (the void) to be able to respond with an error and process the next incoming request.

The closest thing I can find to this is the ChannelHandlerContext's fireChannelInactive to temporarily stop SwiftNIO from pushing data into the application when we want to instead wait, and then fireChannelActive to re-activate it once the stream is being read from. I'm not sure if that's what these functions actually do, though. I would fear they would kill the connection, or we aren't really supposed to use them like this, in normal read operations.

Is this wise to do, is solution #1 more practical, or is Vapor's solution good enough for most scenarios? I'm a bit confused on how to proceed with this, especially after performing research into run loops and how this works with threads and asynchronous operations.

1 Like

Yes. This is known as "backpressure", and NIO supports it via the mechanism of the read function on ChannelInboundHandler.

Under the hood, a NIO channel has a read cycle. The cycle is approximately as follows:

  1. The idle state. Here, the socket is not registered for readability notifications with the system selector: we are not asking to read any data. No reads can be delivered.
  2. A read() call that has passed through the pipeline reaches the head of the ChannelPipeline and is delivered to the Channel. This triggers NIO to register the socket with the selector for readability notification.
  3. Some time later (potentially as soon as the next event loop "tick") the socket is marked readable by the kernel. NIO begins processing reads on that channel.
  4. NIO now spins in a loop on the socket calling the read system call.
    1. Each call to read may read some data.
    2. If it does, channelRead is called with the data we read, and we loop around again.
  5. NIO runs this loop until either the read system call returns EAGAIN, indicating that there is no more data to be read at this time, or until ChannelOptions.maxMessagesPerRead calls have been made.
  6. At the end of the loop, we invoke channelReadComplete.
  7. Presuming we're still open and no new calls to Channel.read() came in the meantime, we return to the idle state and the sequence of operations here begins again.

This should make it fairly clear how to stop NIO from reading from the socket: you stop calling Channel.read. But for most users this might be surprising: after all, they have never called Channel.read. They didn't even know it existed!

This is because NIO has a channel option called autoRead which is turned on by default for all NIO channels. This option will automatically call Channel.read if a read sequence completes and no read() call was already made. This means that in practice Channels are constantly having read called on them, but they call it on themselves.

Happily, then, this gives us a good design for how to exert backpressure. To talk about how we do it, I'll use an example from the NIO repo itself: the HTTPServerPipelineHandler. This ChannelHandler does something quite close to what you're discussing: it arranges it so that a HTTP server that uses it never sees pipelined requests, even if a client sends them. To arrange that, it counts the messages passing through, and when it gets a request .end it doesn't allow any further request messages until the user writes their response .end. Instead it just buffers them, and replays them when the response .end comes through. Simple enough.

The wrinkle, as you identified, is that if this handler just buffered in memory then it would be a nasty little DoS vector. Remote users could pipeline a bunch of requests behind a long-running one and we'd hold them all in memory. Not good! So what it wants to do is, whenever it's passed on a full HTTP message and is waiting for the response, it would like to ask NIO to stop reading from the network.

To do that, it implements ChannelOutboundHandler.read. If it's happy to accept more data, it allows the read call straight through. But if it's waiting for the response .end, it will instead just mark that the read call happened, and do nothing. Later, once it has seen the response .end it was waiting for, it will start letting through read() calls and will trigger a read() if it dropped any earlier on.

This works seamlessly regardless of whether autoRead is on or not, by having the ChannelHandler reduce the read() pattern to the one it wants. The ChannelHandler doesn't issue the reads itself: that would prevent the user from managing their own backpressure. Instead, it adds constraints: if the user wanted more data, but we don't, we suppress the read. Otherwise, do what the user wants.

I'd like to add some detail while I'm here. Firstly, read calls are not cumulative: they are not like Combine's Demand. Each read call will trigger zero or one channelReadComplete calls. If two read calls are made before a channelReadComplete, they will trigger only one channelReadComplete: another read is needed. Essentially, reading is a binary state: we're either waiting for data or we aren't, and asking to read when we were already waiting doesn't change anything.

Secondly, even if you exert backpressure you still need buffers. ChannelPipelines are still pipelines, and just because we stopped reading from the socket doesn't mean you'll immediately have no further data delivered to you. In the most straightforward case, stopping read calls will still potentially allow as many as maxMessagePerRead more chunks of data to come your way! Backpressure works in combination with buffers in NIO-land, not instead of them.

Finally, this is an important design principle that all NIO Channels follow, even the ones that don't have sockets behind them. This works on socket channels, pipe channels, NIO Transport Services channels, and HTTP/2 stream channels.

This is the wrong thing to do. active/inactive are one-time switches, and they are controlled by the channel, not you. Those messages are notifications, not channel control operations, and merely sending them doesn't change the truth of what is happening.

5 Likes

Hi @lukasa, I greatly appreciate the long write up response, to my long question!

You raise a lot of good points and useful information, and I am going to look into this in more detail over the coming week. I'll take a closer look at HTTPServerPipelineHandler in more detail now that I know the importance of read() method. I'll also look into autoRead, and try some experiments (using netcat, as it's easy and basic to simulate slow incoming data).

This is the wrong thing to do. active / inactive are one-time switches, and they are controlled by the channel, not you. Those messages are notifications , not channel control operations, and merely sending them doesn't change the truth of what is happening.

I figured as much. Thanks for the confirmation!

So, there are two main ways of achieving my second goal, of "Only read data off the socket when it's requested":

  1. Turn on .autoRead at the beginning of the connection (for most protocols at least), turn it off when the body is encountered, and turn it back on when the code wishes to process the incoming body.
  2. Disregard .autoRead by overriding _ChannelOutboundHandler.read and only allowing the ChannelHandlerContext.read to be performed when needed (beginning of connection, when we want to read the body)
    1. A call to .read() seems to incur a new channelRead according to the documentation, which seems expected.

In this example, it looks like it uses this mechanism for something slightly different. This is across entire requests, by preventing another entire request from being read before the current one has finished writing its response. My example, on the other hand, is within a single request, preventing the body from being read until the header is processed.

This makes sense. But I think that if the .read() calls are managed carefully, the buffers are more-or-less a constant size, and not as vulnerable to DoS.

Time to write a simple proof of concept. Thanks a lot for your help!!

That's correct, but that's a matter of business logic, not functionality. You can block read() calls whenever you want, for whatever reason you want.

1 Like

@lukasa already answered all the questions here but for others reading this, I thought it may be worth linking some other, very similar information available from this thread (and some images which may help) and that example as well. I thought maybe reading similar thoughts explained in slightly. They discuss exerting backpressure from the network (HTTP upload) to disk (saving the uploaded file).

2 Likes

I made a simple proof of concept to make sure this works correctly, and it does! Even with only one thread for the MultiThreadEventLoopGroup.

private final class OneAtATimeHandler: ChannelInboundHandler, ChannelOutboundHandler {
	public typealias OutboundIn = ByteBuffer
	public typealias InboundIn = ByteBuffer
	public typealias OutboundOut = ByteBuffer

	var read = true
	var handler: ((ByteBuffer, EventLoopPromise<Void>) -> ())? = {
		buffer, promise in
		DispatchQueue.global().asyncAfter(deadline: .now() + 5) {
			print("processed: \(buffer.getString(at: 0, length: buffer.readableBytes) ?? "")")
			promise.succeed(())
		}
	}

	public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
		let buffer = self.unwrapInboundIn(data)
		read = false

		print("received: \(buffer.getString(at: 0, length: buffer.readableBytes) ?? "")")

		let promise = context.eventLoop.makePromise(of: Void.self)
		promise.futureResult.whenComplete { _ in
			print("read = true")
			self.read = true
			context.read()
		}

		handler!(buffer, promise)
	}

	public func read(context: ChannelHandlerContext) {
		print("read")
		if read {
			print("did read. read = false")
			context.read()
		}
	}

	// Flush it out. This can make use of gathering writes if multiple buffers are pending
	public func channelReadComplete(context: ChannelHandlerContext) {
		context.flush()
	}
}

Open nc localhost <port>, type "abc" and the log looks like this, with the appropriate delays between each event. For instance, since a took 5 seconds to process and I typed in b, c, and d before it was finished, this was my log:

Server started and listening on [IPv6]::1/::1:8082
read
did read. read = false
received: a
read
(5 second delay)
processed: a
read = true
received: bcd
read
processed: bcd
read = true

I am still a bit confused on why the read() function is on ChannelOutboundHandler instead of ChannelInboundHandler, though. Reading off of the socket in a server scenario, you would think that's the role of the inbound. Especially because that is the handler that's responsible for the results of the read operations (the data/ByteBuffer).

Similarly, here is an implementation using just .autoRead:

private final class OneAtATimeHandler2: ChannelInboundHandler, ChannelOutboundHandler {
	public typealias OutboundIn = ByteBuffer
	public typealias InboundIn = ByteBuffer
	public typealias OutboundOut = ByteBuffer

	var handler: ((ByteBuffer, EventLoopPromise<Void>) -> ())? = {
		buffer, promise in
		DispatchQueue.global().asyncAfter(deadline: .now() + 5) {
			print("processed: \(buffer.getString(at: 0, length: buffer.readableBytes) ?? "")")
			promise.succeed(())
		}
	}

	public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
		let buffer = self.unwrapInboundIn(data)
		context.channel.setOption(ChannelOptions.autoRead, value: false)

		print("received: \(buffer.getString(at: 0, length: buffer.readableBytes) ?? "")")

		let promise = context.eventLoop.makePromise(of: Void.self)
		promise.futureResult.whenComplete { _ in
			print("read = true")
			context.channel.setOption(ChannelOptions.autoRead, value: true)
			context.read()
		}

		handler!(buffer, promise)
	}

	// Flush it out. This can make use of gathering writes if multiple buffers are pending
	public func channelReadComplete(context: ChannelHandlerContext) {
		context.flush()
	}
}

Though, admittedly, it looks like .setOption() returns a Future, which means it may not be immediate (or maybe there's another option that wouldn't be immediate).

The examples above might be useful for a constant-sized buffer handler. If we have a large body of information coming in through the socket, and we wish to write this to a file in storage, my first post in this thread would consider only reading the body of the request once the header has been validated. However, this is useful because once the body begins to be read, we might only want to allow a read once the latest read is done writing to storage. If storage is too slow for the network, then the process memory could still become overflowed while waiting for storage buffers to flush (in an extreme scenario of course).

That would lead to:

  1. Receive ByteBuffer from NIO
  2. Turn off .autoRead
  3. Pass to storage buffer
  4. Storage buffer async writes to storage, resolves promise
  5. Turn on .autoRead
  6. Wait for NIO to issue next read
  7. Repeat

I wonder if, while this provides safety and better memory handling for async activities, that the extra steps might slow things down somehow. I guess the overhead of the EventLoop events and handler calls aren't too bad; perhaps instead of one-at-a-time it could queue up to 5 ByteBuffers to the storage. But it could definitely slow down the TCP connection to the speed of the storage writing plus a small extra bit for serializing and not queuing the writes. However, maybe that's a good thing! Keep TCP from spamming the various networks for a large file transfer. The constant buffer can be more than a ByteBuffer to allow for faster transfer speeds at first, but for large files it would settle down a bit.

There's a lot that goes into this.

This is a common source of confusion. A similar question might be asked about why channelWritabilityChanged is on the outbound handler, even though only handlers that write care about it.

The reason is that ChannelInboundHandler is not a weird spelling of ChannelHandlerThatReads, and ChannelOutboundHandler is not a weird spelling of ChannelHandlerThatWrites. They group together events based on their source and direction of action. Inbound handlers operate on events that are caused by the network, and happen to your program. This is data arriving, flow control window states changing, connections activating and so on. Outbound handlers operate on events that are caused by your program, and happen to the network. This is things like local closure of connections, writing data, and, yes, reading from the network.

@lukasa That makes a lot of sense. It's not data inbound, it's networking events to our code inbound. And it's not data outbound, it's code events to the networking layer outbound. Thanks!

2 Likes