Creating a Combine Publisher from an NIO ChannelHandler, and file writing

Hi all,

I'm working on implementing a file transfer protocol in NIO, which will have an incoming stream of ByteBuffer objects (not really sure how large the ByteBuffers would be in practice, but in summation they could total gigabytes, at the extreme, of course). And, to avoid large lag times, I don't want to accumulate all of the data for a file into memory from the network first before writing it to a file next.

I think it would be a fun exercise to use Combine to help with this, but I wanted to get input to make sure I'm on the right track.

Problem 1: Preventing incoming data before the Subscriber is made

So, my first guess would be to a PassthroughSubject. The ChannelHandler would create a class with some header information as well as a PassthroughSubject, and hold onto a reference to it until the end of the data stream. When data comes in, it would write to the PassthroughSubject, which passes it down the Combine pipeline which eventually ends in a write to an NIOFileHandle.

The problem is, however, that the Combine pipeline may not be setup entirely just yet. There may be an asynchronous delay between the code further-down-the-line from subscribing to the PassthroughSubject, for some reason such as opening the file handle asynchronously. And, as far as I have researched, any data passed into the PassthroughSubject would be lost. CurrentValueSubject would only keep the latest ByteBuffer, not all of them.

So, a solution to go for would be to buffer the incoming data. For solving just this problem, Publishers.Buffer may be suitable (possibly with Int.max for the buffer to not drop incoming ByteBuffers).

However, I think that telling NIO to stop reading from the socket until the pipeline is established would be cleaner. This would be setting the .autoRead property (see Handling long streams of data, or handling header and body efficiently? - #4 by jamamp). Such a solution sounds like it would necessitate a custom Publisher or Subject or operator that has callbacks to enable or disable .autoRead in NIO at will.

Problem 2: Limiting file handle writes?

Writing to an open NIOFileHandle returns an EventLoopFuture<()>, which makes me think that multiple synchronous calls to write ByteBuffers would be performed concurrently or otherwise be cached/queued. If the network is ingesting data faster than the storage can ingest it, the system memory would be tasked with queuing up the data from the network before it get can written to storage.

As an extension to my "Problem 1" section above, would it be beneficial to have the proposed custom Publisher/Subject/operator also tell NIO to disable .autoRead if a certain threshold is met? I suppose this would be considered "backpressure" in a way. I'm not sure if it practically makes sense, or what kind of method of calculating that threshold would be.

If it should be done serially, such as enabling/disabling .autoRead between each successful file write of a ByteBuffer using the EventLoopFutures, would that significantly slow down the system?

Problem 3: Combine

Is this even something that Combine is suited for? I think Combine might be better for relaying events, and not necessarily data. We have to be careful that no data is lost, and Combine seems fine with dropping some data as needed.

I was toying around with just making a manual class to handle all of the above logic for myself, but get worried about implementation details like thread safety.

Closing thoughts

I would like to use Combine (to learn it more) but am not sure if it's the right tool for the job in this scenario. I search online for "backpressure" but I'm not sure if that term would apply to my problem domain here. I'm not sure how the kernel handles many concurrent writes to a file handle/descriptor, and if I need to throttle that myself.

Some guidance would be much appreciated! And, if this is much more of a Combine question than NIO, I can post this in a more appropriate area.

As I’ve noted in the above thread, I recommend against toggling autoread for this, and instead interacting with the read call itself. autoRead is a tempting lever, but it’s a bit hefty, and managing read composes better. Otherwise, yes, this approach will work, but you would need a custom publisher.

The ultimate problem here is that you’re trying to bridge between two streaming abstractions, each with their own backpressure mechanism. This is never going to be trivial.

The returned EventLoopFuture is a low-level primitive whose intent is to allow you to propagate backpressure: that is, we tell you when the write is done. This makes it easy to have exactly one write in flight.

However, your instinct that toggling autoRead between each successful file write would slow things down is right. It’s a good way to manage backpressure, but it is excessively slow.

In networking this kind of problem is usually solved with buffering. Each node in a network will propagate backpressure, but they will also have a buffer of data they’re willing to store before they do so. The reason is that it reduces the latency experienced when responding to backpressure.

Consider the case where the client can produce data 5x faster than the filesystem can accept it. If you have no buffer, the first datum is sent to the filesystem, but you then refuse to accept more. The client will then be idle for enough time to produce 4 more messages before the write even completes. But when it does, you have to ask the client for another message, which it then goes to produce. This means you’re strictly serialising the system: no good! If you had a buffer of size 1, when the write completes you could immediately submit the next write to the filesystem and then ask the client for data. This lets the client submit the data asynchronously, while the filesystem is writing, which will improve throughput. The total load on the client is the same, but you’re improving throughput.

So ideally you’d have a buffer you were willing to fill before you toggled autoRead, and you’d serve your combine subscribers from that buffer. In the best version you’d also have high and low watermarks, such that you could start asking the client for more data before you completely empty the buffer.

In the abstract, Combine is fine for this, but the difficulty is with composing it into the NIO pipeline in a way where everything glues together nicely. It’s absolutely do-able, but its not trivial.

1 Like

You're totally correct. It's been a bit since I wrote that, and I've since tunneled down into this rabbit hole, and didn't read my previous thread in full. I'll be sure to take another read when implementing this.

Agreed!

Also totally correct. I think I would implement it in a way that it initially does fill out a buffer before the Subscriber subscribes, so that when the subscription is set up, there's already data available. And this would lend itself well to the problem you described as well.

For example, we fire off the header and begin filling a 5 ByteBuffer buffer of data. Read is disabled. The Subscriber subscribes, and the first ByteBuffer is written. Once complete, the first is removed from the buffer, read is enabled, the next ByteBuffer begins writing to the file system while another ByteBuffer (cached by the kernel for TCP at this point) is retrieved and put into my buffer.

And if I wanted to get really fancy, it could flush the buffered ByteBuffers to the filesystem up to a certain point, and have up to, say, 5 pending writes to the file system at a time, and keep that "buffer" full, and disable NIO's reading while those are pending, if that even happens. This might be unnecessary, though, so I'll see how the simpler version performs first.

As for breaking this out, it seems I need a new buffer publisher or subject that can allow for sending data into the pipeline, as well as callback handlers for enabling/disabling that ingress. The other half would be the dynamic buffering. But the dynamic buffering needs to be the one that tells the upstream publisher not to produce data, so this might be an all-in-one package. Otherwise, I'd have two publishers with callback handlers to tell the upstream to stop sending data. Although, this might be where the Subscribers.Demand could come in, but I'd have to look more into that. Then, the buffer can change between a demand of 0 and >0, then the upstream is what tells NIO to start/stop.

I think this is the next step of my project. Hopefully I can get something working and unit tested, and I'll reply back here with results! If it's useful enough I could make it its own library for others to use (or contribute to other Combine aggregate repos. Speaking of, those might be good to look at for proper implementations of Combine protocols). I wonder if this would be a beneficial bridge from NIO to Combine in general, converting a ChannelHandler to a Publisher with buffering. See Will SwiftNIO adapt to the new Combine framework?. I tried looking for such a thing but was unsuccessful.

Side note: the reason I type all of this out is to help process my thoughts. And if it helps someone else with their problem, that's another plus!

Thanks for the thoughtful and detailed responses, @lukasa!

2 Likes

Sounds like a good architecture! Please do report back with any of your work, it sounds like it'll be really useful for the wider community.

1 Like

Small update!

I found it very difficult to wrap my head around the internals of Combine pipelines and how exactly the protocol was followed, so I found it helpful to diagram it all out, instead of staring at my screen blankly. I'm going to use this as a guide on how to create each of my two Publishers, and the overall pipeline.

I might tweak this a bit to combine the PassthroughSubject and the ExposedDemandPublisher, though.

The most important thing is that the ExposedDemandPublisher responds to the BufferedDataPublisher's demand (a Combine concept), and bridges that over to the NIOChannelHandler via a closure, and the NIOChannelHandler is responsible for not reading from the TCP socket.

Once a file write operation succeeds and the EventLoopFuture operation finishes, it'll tell the BufferedDataSubscription that it has completed, which can clear the buffer of that item, and tell the upstream ExposedDemandSubscription of the increased demand, which then bridges it over to NIO which enables socket reading which produces the next ByteBuffer to go down the pipeline. I'll need to figure out how exactly to bridge the EventLoopFuture completion to the SinkSubscriber or BufferedDataSubscriber, though. This might need a custom Subscriber instead of Sink.

Also, OpenCombine GitHub - OpenCombine/OpenCombine: Open source implementation of Apple's Combine framework for processing values over time. has been a great help in understanding the internals of Combine!

1 Like

For the Downstream, I think I will go for a special Sink, called FutureSink, which is initialized with a maximum number of concurrent Futures, and a closure that returns a Future. This way, when the FutureSink subscribes, it demands from upstream only up to a certain number of values. When it receives a value, it gets a Future back from the closure. When the Future resolves, it sends an increased demand of +1 to the upstream BufferedDataSubscription.

This does change the diagram a bit, though. The BufferedDataSubscription is no longer the one holding onto the value until the Future ends, now it'll be the FutureSink. Once BufferedData responds to the sink's demand, it will have to get rid of the value, and ask for more data from upstream. Only once the FutureSink reaches the maximum number of concurrent Futures will the demand reach 0 which tells upstream to stop sending data. This would cause the buffer to fill up which then tells NIO to stop reading from socket.

I think this will work. It's certainly a lot of customized Combine pieces, and nothing built-in. I could end up making it as one single piece, instead of three. But breaking it up will keep each simpler and unit-testable.

Here is a more updated diagram which shows the new FutureSink and its interactions with the data buffer more cleanly.