Creating a Combine Publisher from an NIO ChannelHandler, and file writing

You're totally correct. It's been a bit since I wrote that, and I've since tunneled down into this rabbit hole, and didn't read my previous thread in full. I'll be sure to take another read when implementing this.

Agreed!

Also totally correct. I think I would implement it in a way that it initially does fill out a buffer before the Subscriber subscribes, so that when the subscription is set up, there's already data available. And this would lend itself well to the problem you described as well.

For example, we fire off the header and begin filling a 5 ByteBuffer buffer of data. Read is disabled. The Subscriber subscribes, and the first ByteBuffer is written. Once complete, the first is removed from the buffer, read is enabled, the next ByteBuffer begins writing to the file system while another ByteBuffer (cached by the kernel for TCP at this point) is retrieved and put into my buffer.

And if I wanted to get really fancy, it could flush the buffered ByteBuffers to the filesystem up to a certain point, and have up to, say, 5 pending writes to the file system at a time, and keep that "buffer" full, and disable NIO's reading while those are pending, if that even happens. This might be unnecessary, though, so I'll see how the simpler version performs first.

As for breaking this out, it seems I need a new buffer publisher or subject that can allow for sending data into the pipeline, as well as callback handlers for enabling/disabling that ingress. The other half would be the dynamic buffering. But the dynamic buffering needs to be the one that tells the upstream publisher not to produce data, so this might be an all-in-one package. Otherwise, I'd have two publishers with callback handlers to tell the upstream to stop sending data. Although, this might be where the Subscribers.Demand could come in, but I'd have to look more into that. Then, the buffer can change between a demand of 0 and >0, then the upstream is what tells NIO to start/stop.

I think this is the next step of my project. Hopefully I can get something working and unit tested, and I'll reply back here with results! If it's useful enough I could make it its own library for others to use (or contribute to other Combine aggregate repos. Speaking of, those might be good to look at for proper implementations of Combine protocols). I wonder if this would be a beneficial bridge from NIO to Combine in general, converting a ChannelHandler to a Publisher with buffering. See Will SwiftNIO adapt to the new Combine framework?. I tried looking for such a thing but was unsuccessful.

Side note: the reason I type all of this out is to help process my thoughts. And if it helps someone else with their problem, that's another plus!

Thanks for the thoughtful and detailed responses, @lukasa!

2 Likes