Hi all,
I'm working on implementing a file transfer protocol in NIO, which will have an incoming stream of ByteBuffer objects (not really sure how large the ByteBuffers would be in practice, but in summation they could total gigabytes, at the extreme, of course). And, to avoid large lag times, I don't want to accumulate all of the data for a file into memory from the network first before writing it to a file next.
I think it would be a fun exercise to use Combine to help with this, but I wanted to get input to make sure I'm on the right track.
Problem 1: Preventing incoming data before the Subscriber is made
So, my first guess would be to a PassthroughSubject. The ChannelHandler would create a class with some header information as well as a PassthroughSubject, and hold onto a reference to it until the end of the data stream. When data comes in, it would write to the PassthroughSubject, which passes it down the Combine pipeline which eventually ends in a write to an NIOFileHandle.
The problem is, however, that the Combine pipeline may not be setup entirely just yet. There may be an asynchronous delay between the code further-down-the-line from subscribing to the PassthroughSubject, for some reason such as opening the file handle asynchronously. And, as far as I have researched, any data passed into the PassthroughSubject would be lost. CurrentValueSubject would only keep the latest ByteBuffer, not all of them.
So, a solution to go for would be to buffer the incoming data. For solving just this problem, Publishers.Buffer may be suitable (possibly with Int.max
for the buffer to not drop incoming ByteBuffers).
However, I think that telling NIO to stop reading from the socket until the pipeline is established would be cleaner. This would be setting the .autoRead
property (see Handling long streams of data, or handling header and body efficiently? - #4 by jamamp). Such a solution sounds like it would necessitate a custom Publisher or Subject or operator that has callbacks to enable or disable .autoRead
in NIO at will.
Problem 2: Limiting file handle writes?
Writing to an open NIOFileHandle returns an EventLoopFuture<()>
, which makes me think that multiple synchronous calls to write ByteBuffers would be performed concurrently or otherwise be cached/queued. If the network is ingesting data faster than the storage can ingest it, the system memory would be tasked with queuing up the data from the network before it get can written to storage.
As an extension to my "Problem 1" section above, would it be beneficial to have the proposed custom Publisher/Subject/operator also tell NIO to disable .autoRead
if a certain threshold is met? I suppose this would be considered "backpressure" in a way. I'm not sure if it practically makes sense, or what kind of method of calculating that threshold would be.
If it should be done serially, such as enabling/disabling .autoRead
between each successful file write of a ByteBuffer using the EventLoopFutures, would that significantly slow down the system?
Problem 3: Combine
Is this even something that Combine is suited for? I think Combine might be better for relaying events, and not necessarily data. We have to be careful that no data is lost, and Combine seems fine with dropping some data as needed.
I was toying around with just making a manual class to handle all of the above logic for myself, but get worried about implementation details like thread safety.
Closing thoughts
I would like to use Combine (to learn it more) but am not sure if it's the right tool for the job in this scenario. I search online for "backpressure" but I'm not sure if that term would apply to my problem domain here. I'm not sure how the kernel handles many concurrent writes to a file handle/descriptor, and if I need to throttle that myself.
Some guidance would be much appreciated! And, if this is much more of a Combine question than NIO, I can post this in a more appropriate area.