How to use NonBlockingFileIO for repeated writes

I'm writing the body data of HTTP requests and responses to local files in my ChannelInboundHandler subclass, each HTTP message getting its own file. I could add the ByteBuffers from the .body HTTPParts to a combined buffer as they come in, and write the file at the .end part, but this could briefly consume a lot of memory for large HTTP messages. So I want to, instead, have the ByteBuffer written to the file as soon as it comes in without holding on to it unnecessarily.

I'm having difficulty finding a good way to use NonBlockingFileIO and NIOFileHandle for these repeated writes. Basically, as everything happens asynchronously, both opening the NIOFileHandle for a new HTTP message and then writing to it return an EventLoopFuture, and the next write should only happen after the open and/or previous write has completed.

As I need to do these from different invocations of channelRead(context: ChannelHandlerContext, data: NIOAny), I would need to keep a reference to the previous EventLoopFuture or have a queue of pending file writes to make sure they are executed in order, right?

The tests for NonBlockingFileIO only have repeated writes done inside the same loop, and they .wait() for the completion of the write, whereas I cannot .wait() at least on the main event loop.

What would be the best way to go about this?

[edit] I just found swift-nio-extras/WritePCAPHandler.swift at main · apple/swift-nio-extras · GitHub. It does almost what I need, but does not use NonBlockingFileIO and looks a bit complicated :confused:

3 Likes

That's a great question! The overly simplistic answer is to accumulate all the bits of the HTTP request body you receive in say a var receivedBits: CircularBuffer<ByteBuffer> in your ChannelHandler. Once you receive them, you write them to disk one after the other using NonBlockingFileIO.

The reason I called this overly simplistic is because that would leave your server vulnerable to a fairly straightforward denial of service attack: If writing to disk is slower than the client sending you data, your server will eventually die because it runs out of memory.

To solve this problem, we need to exert backpressure from the file system APIs through the Channel, over TCP to the client sending this data.

[[ beginning of a few paragraphs copy & pasted from my example implementation ]]
First, let's establish what it means to propagate backpressure from the file system into the Channel: Let's assume we have a HTTP server that accepts arbitrary amounts of data and writes it to the file system. If data is received faster over the network than we can write it to the disk, then the server runs into trouble: It can now only either buffer the data in memory or (at least in theory) drop it on the floor. The former would easily be usable as a denial of service exploit, the latter means that the server isn't able to provide its core functioniality. Backpressure is the mechanism to the the buffering issue above. The idea is that the server stops accepting more data from the client than it can write to disk. Because HTTP runs over TCP which has flow-control built in, the TCP stacks will then lower the server's receive window size which means that the client gets slowed down of completely stopped from sending any data. Once the server finishes writing previously received data to disk, it starts draining the receive buffer which then make TCP's flow control raise the window sizes which allows the client to send further data.

Backpressure in SwiftNIO

In SwiftNIO, backpressure is propagated by stopping to call the outbound read event. By default, Channels in SwiftNIO have the autoRead ChannelOption enabled. When autoRead is enabled, SwiftNIO will automatically send a read (note, this is a very different event than the inbound channelRead event that is used to deliver data) event when the previous read burst has completed (signalled by the inbound channelReadComplete event).
Therefore, you may be unaware of the existance of the read event despite having used SwiftNIO before. Suppressing the read event is one of the key demonstrations of this example. The fundamental idea is that to start with we let read flow through the ChannelPipeline until we have an HTTP request and the first bits of its request body. Once we received the first bits of the HTTP request body, we will _ suppress _ read from flowing through the ChannelPipeline which means that SwiftNIO will stop reading further data from the network.

When SwiftNIO stops reading further data from the network, this means that TCP flow control will kick in and slow the client down sending more of the HTTP request body (once both the client's send and the server's receive buffer are full). Once the disk writes of the previously received chunks have completed, we will issue a read event (assuming we held up at least one). From then on, read events will flow freely until the next bit of the HTTP request body is received, when they'll be suppressed again.

This means however fast the client or however slow the disk is, we should be able to stream arbitarily size HTTP request bodies to disk in constant memory.
[[ c&p end ]]

Okay, how do we implement this

The implementation idea goes like this:

  • Once we received the HTTP request head, we open the file
  • Once we receive a body chunk, we write it to the file and start buffering the next ones that may arrive
  • Whilst we're writing to disk, we're suppressing read to slow the client down
  • When we're finishing writing the previously received chunks, we read again and continue
  • If there's any errors or we're done, we close the file and respond accordingly

How much code is this?

Quite a bit. As an example, I've implemented a simple HTTP server (don't use this in production!) that just saves anything it receives to a file in /tmp/uploaded_file_*.

Most of the code is fairly straightforward, but it does contain an almost 250 line long state machine that coordinates the whole receive/backpressure/write to disk spiel. It is totally possible to knock this up in an ad-hoc version directly inside a ChannelHandler but I feel this is complex enough that it warrants a completely I/O and side-effect free state machine that is super straightforward to test (and also debug if something goes wrong).

Why is this so hard?

This is so hard because we're combining two (incompatible) streaming pieces - a Channel and a NonBlockingFileIO - in a non-trivial way. Sure, they both come out of the swift-nio package but SwiftNIO's core mission is to deal with everything that happens inside a Channel. We only even ship NonBlockingFileIO because there's no real alternative thus far so we had to include it but it's definitely not anything that we consider the core of our API... So once you have to glue that Channel to say a NonBlockingFileIO, you've got to write the glue yourself.

And writing glue to glue two asynchronous pieces that can both deliver a variety of events and fail at any time isn't that easy. It's sometimes easy to forget that events can arrive in really inconvenient and unexpected orders :slight_smile:. Therefore, we always suggest to use explicit state machines, that looks verbose but is the right call if you're building something that's more than just a toy. To learn more, my colleague @lukasa gave a wonderful talk about state machines that I can recommend to anybody who wants to retain their sanity whilst writing event-driven software :slight_smile:.

IMHO, the right answer for the future here is to get the ReactiveStreams game going on all platforms and offer standard adapters for both SwiftNIO's ChannelPipeline as well as NonBlockingFileIO or hopefully something much better. Once we've got that going, it should be as straightforward as plugging two pieces together in a few lines of code.

9 Likes

I still think that Node.js v3 streams are are really awesome abstraction to deal w/ the back pressure problem (Noze.io implements them for Swift but w/o NIO, and it is really slow for various reasons).
Would love to see something as general for NIO (I'd like to eventually finish the Noze port to NIO w/ Macro, but that will probably take quite a while :slight_smile:)

This is a great introduction to streams: https://github.com/substack/stream-handbook (I haven't seen a better system for this issue so far, recommendations are welcome!)

1 Like

I did this, where I'm just writing the incoming data buffers of requests and responses to a NIOFileHandle that I open when receiving the .head part. I'm basically discarding the future that comes back from the write.

_ = fileIO.write(fileHandle: fileHandle, buffer: buffer, eventLoop: context.eventLoop)

Finally, I'm closing the NIOFileHandle in the handler's deinit.

It does seem to work, although I haven't yet looked at it that closely to be sure. What would go wrong with a simple approach like this? What I'm wondering is, will writes to the same open NIOFileHandle get queued up and executed serially, or will they wreak havoc on each other?

To be clear I'm calling write for a single NIOFileHandle always from the same channel, since every HTTP request and response will get a separate file.

Hmm, that can't be quite right because you then can't know when you're ready to write the next chunk.

Well, it probably appears to work but it's not a good idea to use deinit to manage scarce resources like file descriptors. You want them to be closed as soon as possible. The handler however will stay in the pipeline until it closes, so with HTTP/1 potentially forever (keep-alive).

You won't have backpressure and likely you're also corrupting the file because you'll write to it multiple times concurrently.

It will not. If you happen to have a NIOThreadPool with just 1 thread, then it may appear to work because then the work for the thread pool gets indeed enqueued in order and only one write will run concurrently (because the writes are blocking). That is basically exactly what I described as the 'overly simplistic' approach except that you're using NIOThreadPool's work queue as the chunk queue. Again, this will only work for 1 thread. And even with 1 thread, you'll be missing the backpressure which means it's pretty easy to denial-of-service your server.

1 Like

I'll take any streaming abstraction that's async and does backpressure and is widely adopted by the community :slight_smile:. I have my opinions about a bunch of API choices in existing ones by my biggest opinion is that the Swift project & community needs to get this problem sorted.

4 Likes

Thanks for pointing out the issue with the file close. I can easily move it to where .end part arrives, to not keep it around too long.

I should say I’m not making a large scale server, but a simple proxy. So I would prefer request handling to just block in (the extremely unlikely) case file writing cannot keep up with network traffic. Is there any way to do that?

I probably had 2 threads for the NIOThreadPool. I think what I’ll do (again, in my very simple use case) is I’ll lower that to 1 thread and call it a day.

That may be too early because previous disk writes are still in progress. You need to close when you're done with all enqueues writes. And obviously also don't forget to close if an error come through the ChannelPipeline or one of the disk writes fails.

It's definitely not extremely unlikely but rather likely, especially if you use that proxy on a local machine.

I did the following tiny change in my program

diff --git a/backpressure-file-io-channel/Sources/BackpressureChannelToFileIO/SaveEverythingHTTPServer.swift b/backpressure-file-io-channel/Sources/BackpressureChannelToFileIO/SaveEverythingHTTPServer.swift
index da36b2a..0a72229 100644
--- a/backpressure-file-io-channel/Sources/BackpressureChannelToFileIO/SaveEverythingHTTPServer.swift
+++ b/backpressure-file-io-channel/Sources/BackpressureChannelToFileIO/SaveEverythingHTTPServer.swift
@@ -137,9 +137,9 @@ extension SaveEverythingHTTPServer: ChannelDuplexHandler {
     }
 
     func read(context: ChannelHandlerContext) {
-        if self.state.shouldWeReadMoreDataFromNetwork() {
+        //if self.state.shouldWeReadMoreDataFromNetwork() {
             context.read()
-        }
+        //}
     }

which switches off the backpressure and ran a quick

curl --data-binary @/tmp/4gb_long_file http://localhost:8080/foo

These are the results:

With backpressure:

And without (after that patch above):

eventually (but very slowly because we use so much memory) going to:

So peak memory usage 2.6 MB vs. 3 GB (note Mega vs. Giga). The non-backpressure version is also much slower because everything takes longer to process because of all the memory we have acquire/release/cache-miss/possibly swap... etc.

No, SwiftNIO is a non-blocking system. If you literally block the event loop, nothing will happen anymore, ie. other requests and any I/O (even writes from yourself) will be stalling completely, even if from unrelated requests.

In a fully synchronous system, you can indeed you blocking as a means of back-pressure, however you will find that in a proxy you would then actually need at least two threads per request because you don't know if you should read from the request body from the client or the response body from the server. To make sure you don't suffer complete stalls you'd need to do both (blocking 2 threads per request).

If that still sounds appealing to you, then you can actually build a fully blocking system which basically builds a blocking adapter for SwiftNIO. So for a Channel you'd spawn a 'buddy thread' which controls what's going on by blocking. It could make NIO mirror that by sending/suppressing the relevant events.

Maybe the following diagram gives you an idea:

This will work totally fine for very simple systems that just basically do

while true {
   let thing = read()
   write(thing)
}

The key property being that you always exactly know what you need to do next: Either you want to read or you want to write what you've read before.

In the real world however, these systems are pretty rare and a proxy is not one of them. You kinda needs two of the loops above, one for the request and one for the response (they can both go on indefinitely at the same time, say if you stream both ways). That can still be solved by just burning 2 threads and adding a bunch of locks. But to be honest, you'll need a state machine again to manage exactly what those those threads are doing. Because for example if one of them hits an error, the other one now needs to be told to stop. That's actually also pretty hard in blocking systems because the other thread may actually be blocking (in read or write) for an indefinite amount of time. That now means you can't close the file descriptor so you'll need a state machine that tells you once both threads have finalised. If you want to wait for both of those threads to finish, you may actually need a third thread, ... I guess you get the idea :slight_smile:. That by the way is the reason that UNIX's first eventing API select came at the same time as BSD Sockets. Back then, everything was blocking because the just didn't have any high-scale network services. And still then introduced a inversion of control, asynchronous API.
Yes, you can in theory keep all the synchronous APIs and use cross-thread/process UNIX signals to interrupt the other threads/processes that you want to wake up but that'll get you an even deeper mess :smiley:. This article deals with this pretty much this topic (it's about multiplexing terminal IO and network IO rather than 2 network channels) is actually pretty good.

Yeah, 1 thread will at least appear to work correctly. However, you will only be able to do one write (to disk) call at the same time. That means that you're more likely to start buffering more received body bits in memory which means that you're more likely to use crazy amounts of memory and then probably crash at some point.

3 Likes

Apologies for the drive by comment - this was posted in Vapor's Discord recently and may be of interest:

2 Likes

Can someone confirm, that this is a valid solution and the way to go at the moment? :thinking:

Why does one have to setup a new NonBlockingFileIO at VaporUploads/StreamController.swift at master · mcritz/VaporUploads · GitHub instead of using the one in Vapor.Application?

I'm loving Vapor, but do not have the time to look deeper into every bit, Klaas.

@tanner0101 would need to help here. The key bit is body.drain which implements some kind of streaming and needs a similar state machine in its implementation if it's fully working.

1 Like

After getting too many Precondition failed: blacklisted errno 9 Bad file descriptor in write(descriptor:pointer:size:)) errors, I switched to using old school dispatch queues, NSFileManager and NSFileHandle again. I'm still using the body draining from the mcritz/VaporUploads repository.

In the future I would prefer to use NIOFileHandles, but couldn't figure out how.

1 Like

"Bad file descriptor" errors are very bad. Does this happen with Vapor's body.drain?

Can you send the stack traces of the crashes?

I did some further investigations. The crash happens not on the first upload, but on one of the subsequent ones. Just use curl to upload some files (or the same file) repeatedly.

The crash can easily be reproduced with the current version of GitHub - mcritz/VaporUploads: Demonstrating uploads in Vapor 4. Particularly large streaming uploads.

In case of an error the file handle is closed here: VaporUploads/StreamController.swift at 3414f5960351b0c45feb55e48058bc756cd6001a · mcritz/VaporUploads · GitHub

In case of a successful stream end the file handle is not closed! This was a change that I made during adopting the code. More or less without noticing. By adding try? fHand.close() to the .end case the crash happens.

Closing the file handle seems to free the file handle. Some logging shows that the next one has the same descriptor. And this leads to Precondition failed: blacklisted errno 9 Bad file descriptor in write(descriptor:pointer:size:)).

Here is a stack trace from the modified VaporUploads:

#0 0x00007fff6d469380 in _swift_runtime_on_report ()
#1 0x00007fff6d4e3243 in _swift_stdlib_reportFatalErrorInFile ()
#2 0x00007fff6d1aa9de in closure #1 in closure #1 in closure #1 in assertionFailure(:_:file:line:flags:) ()
#3 0x00007fff6d1aa5e7 in closure #1 in closure #1 in assertionFailure(:_:file:line:flags:) ()
#4 0x00007fff6d1aabd7 in specialized String.withUTF8(_:) ()
#5 0x00007fff6d1a91c0 in assertionFailure(:_:file:line:flags:) ()
#6 0x0000000100221b91 in preconditionIsNotBlacklistedErrno(err:where:) at /Users/coder/Library/Developer/Xcode/DerivedData/VaporUploads-ajmxddgcdmrnukafnxpqtkkguunf/SourcePackages/checkouts/swift-nio/Sources/NIO/System.swift:112
#7 0x000000010022252e in syscall(blocking:where:_:) at /Users/coder/Library/Developer/Xcode/DerivedData/VaporUploads-ajmxddgcdmrnukafnxpqtkkguunf/SourcePackages/checkouts/swift-nio/Sources/NIO/System.swift:136
#8 0x0000000100225aee in static Posix.write(descriptor:pointer:size:) at /Users/coder/Library/Developer/Xcode/DerivedData/VaporUploads-ajmxddgcdmrnukafnxpqtkkguunf/SourcePackages/checkouts/swift-nio/Sources/NIO/System.swift:334
#9 0x00000001001b5729 in closure #1 in closure #1 in closure #1 in NonBlockingFileIO.write0(fileHandle:toOffset:buffer:eventLoop:) at /Users/coder/Library/Developer/Xcode/DerivedData/VaporUploads-ajmxddgcdmrnukafnxpqtkkguunf/SourcePackages/checkouts/swift-nio/Sources/NIO/NonBlockingFileIO.swift:442
#10 0x00000001001b65eb in partial apply for closure #1 in closure #1 in closure #1 in NonBlockingFileIO.write0(fileHandle:toOffset:buffer:eventLoop:) ()
#11 0x00000001001b39f3 in thunk for @callee_guaranteed (@unowned Int32) -> (@unowned IOResult, @error @owned Error) ()
#12 0x00000001001b6614 in partial apply for thunk for @callee_guaranteed (@unowned Int32) -> (@unowned IOResult, @error @owned Error) ()
#13 0x000000010017d182 in NIOFileHandle.withUnsafeFileDescriptor(_:) at /Users/coder/Library/Developer/Xcode/DerivedData/VaporUploads-ajmxddgcdmrnukafnxpqtkkguunf/SourcePackages/checkouts/swift-nio/Sources/NIO/FileHandle.swift:79
#14 0x00000001001b4fa0 in closure #1 in closure #1 in NonBlockingFileIO.write0(fileHandle:toOffset:buffer:eventLoop:) at /Users/coder/Library/Developer/Xcode/DerivedData/VaporUploads-ajmxddgcdmrnukafnxpqtkkguunf/SourcePackages/checkouts/swift-nio/Sources/NIO/NonBlockingFileIO.swift:435
#15 0x00000001001b65b7 in partial apply for closure #1 in closure #1 in NonBlockingFileIO.write0(fileHandle:toOffset:buffer:eventLoop:) ()
#16 0x00000001000ee3ea in closure #1 in ByteBuffer.readWithUnsafeReadableBytes(_:) at /Users/coder/Library/Developer/Xcode/DerivedData/VaporUploads-ajmxddgcdmrnukafnxpqtkkguunf/SourcePackages/checkouts/swift-nio/Sources/NIO/ByteBuffer-aux.swift:261
#17 0x00000001000ee444 in partial apply for closure #1 in ByteBuffer.readWithUnsafeReadableBytes(_:) ()
#18 0x00000001000ee463 in thunk for @callee_guaranteed (@unowned UnsafeRawBufferPointer) -> (@unowned Int, @error @owned Error) ()
#19 0x00000001000ee4c4 in partial apply for thunk for @callee_guaranteed (@unowned UnsafeRawBufferPointer) -> (@unowned Int, @error @owned Error) ()
#20 0x00000001000f8593 in ByteBuffer.withUnsafeReadableBytes(_:) at /Users/coder/Library/Developer/Xcode/DerivedData/VaporUploads-ajmxddgcdmrnukafnxpqtkkguunf/SourcePackages/checkouts/swift-nio/Sources/NIO/ByteBuffer-core.swift:570
#21 0x00000001000ee301 in ByteBuffer.readWithUnsafeReadableBytes(_:) at /Users/coder/Library/Developer/Xcode/DerivedData/VaporUploads-ajmxddgcdmrnukafnxpqtkkguunf/SourcePackages/checkouts/swift-nio/Sources/NIO/ByteBuffer-aux.swift:261
#22 0x00000001001b4b63 in closure #1 in NonBlockingFileIO.write0(fileHandle:toOffset:buffer:eventLoop:) at /Users/coder/Library/Developer/Xcode/DerivedData/VaporUploads-ajmxddgcdmrnukafnxpqtkkguunf/SourcePackages/checkouts/swift-nio/Sources/NIO/NonBlockingFileIO.swift:433
#23 0x00000001001b6530 in partial apply for closure #1 in NonBlockingFileIO.write0(fileHandle:toOffset:buffer:eventLoop:) ()
#24 0x00000001000c356f in thunk for @escaping @callee_guaranteed () -> (@error @owned Error) ()
#25 0x00000001001b6584 in thunk for @escaping @callee_guaranteed () -> (@error @owned Error)partial apply ()
#26 0x00000001001ade70 in closure #1 in NIOThreadPool.runIfActive(eventLoop:_:) at /Users/coder/Library/Developer/Xcode/DerivedData/VaporUploads-ajmxddgcdmrnukafnxpqtkkguunf/SourcePackages/checkouts/swift-nio/Sources/NIO/NIOThreadPool.swift:219
#27 0x00000001001ab63f in thunk for @escaping @callee_guaranteed (@unowned NIOThreadPool.WorkItemState) -> () ()
#28 0x00000001001af721 in thunk for @escaping @callee_guaranteed (@unowned NIOThreadPool.WorkItemState) -> ()partial apply ()
#29 0x00000001001aaaa8 in thunk for @escaping @callee_guaranteed (@in_guaranteed NIOThreadPool.WorkItemState) -> (@out ()) ()
#30 0x00000001001ab63f in thunk for @escaping @callee_guaranteed (@unowned NIOThreadPool.WorkItemState) -> () ()
#31 0x00000001001af6d1 in thunk for @escaping @callee_guaranteed (@unowned NIOThreadPool.WorkItemState) -> ()partial apply ()
#32 0x00000001001aaaa8 in thunk for @escaping @callee_guaranteed (@in_guaranteed NIOThreadPool.WorkItemState) -> (@out ()) ()
#33 0x00000001001ab63f in thunk for @escaping @callee_guaranteed (@unowned NIOThreadPool.WorkItemState) -> () ()
#34 0x00000001001af531 in thunk for @escaping @callee_guaranteed (@unowned NIOThreadPool.WorkItemState) -> ()partial apply ()
#35 0x00000001001aaaa8 in thunk for @escaping @callee_guaranteed (@in_guaranteed NIOThreadPool.WorkItemState) -> (@out ()) ()
#36 0x00000001001ac5e9 in closure #2 in NIOThreadPool.process(identifier:) at /Users/coder/Library/Developer/Xcode/DerivedData/VaporUploads-ajmxddgcdmrnukafnxpqtkkguunf/SourcePackages/checkouts/swift-nio/Sources/NIO/NIOThreadPool.swift:150
#37 0x00000001001ab856 in thunk for @callee_guaranteed (@guaranteed @escaping @callee_guaranteed (@unowned NIOThreadPool.WorkItemState) -> ()) -> (@error @owned Error) ()
#38 0x00000001001af414 in thunk for @callee_guaranteed (@guaranteed @escaping @callee_guaranteed (@unowned NIOThreadPool.WorkItemState) -> ()) -> (@error @owned Error)partial apply ()
#39 0x00007fff6d19c0ee in Optional.map(_:) ()
#40 0x00000001001abf25 in NIOThreadPool.process(identifier:) at /Users/coder/Library/Developer/Xcode/DerivedData/VaporUploads-ajmxddgcdmrnukafnxpqtkkguunf/SourcePackages/checkouts/swift-nio/Sources/NIO/NIOThreadPool.swift:150
#41 0x00000001001ad48e in closure #3 in NIOThreadPool.start() at /Users/coder/Library/Developer/Xcode/DerivedData/VaporUploads-ajmxddgcdmrnukafnxpqtkkguunf/SourcePackages/checkouts/swift-nio/Sources/NIO/NIOThreadPool.swift:184
#42 0x00000001001614df in thunk for @escaping @callee_guaranteed (@guaranteed NIOThread) -> () ()
#43 0x000000010022a0e1 in partial apply for thunk for @escaping @callee_guaranteed (@guaranteed NIOThread) -> () ()
#44 0x0000000100230f53 in closure #1 in static ThreadOpsPosix.run(handle:args:detachThread:) at /Users/coder/Library/Developer/Xcode/DerivedData/VaporUploads-ajmxddgcdmrnukafnxpqtkkguunf/SourcePackages/checkouts/swift-nio/Sources/NIO/ThreadPosix.swift:105
#45 0x0000000100231089 in @objc closure #1 in static ThreadOpsPosix.run(handle:args:detachThread:) ()
#46 0x0000000101ff1c65 in _pthread_start ()
#47 0x0000000101fed4af in thread_start ()

cc @mcritz

Hi Johannes,

I'm using async-http-client with a delegate to save the response to disk. If I return an EventLoopFuture from the didReceive... functions that is fulfilled when the file write is done, do you know whether this will deal with the back pressure. Comments imply this is the case, just wanted to verify this is correct.

That is correct. I think there's even a test case. AHC basically impelements some form of (non-composable) streaming with its delegate. It's the same form of ad-hoc streaming that NIO's NonBlockingFileIO does with readChunked.
And Vapor's body.drain does that again (which actually took the state machine from this example and simplified it because it doesn't handle open/close file). That also is exactly the problem, every single time we have to solve this issue. If we had one available reactive streaming (that supports backpressure) abstraction, then we'd solve it once, there's be a adapter that connects the NIO pipeline to the Streaming bit, that's it.

This is a real issue (probably security relevant). And the problem is almost certainly not in Vapor/NIO even if you see it only when using Vapor's body.drain. What you're seeing here is almost certainly a file descriptor double-close.

Two questions:

  • is the program that exhibits this issue available somewhere?
  • are you using Glibc/Darwin's close function or Foundation's FileHandle anywhere in this application?

The reason I'm almost certain that it's not in Vapor/NIO is that both use NIOFileHandle under the hood and NIOFileHandle checks (with assertions) that a) every file descriptor is closed b) every file descriptor is only closed once.

1 Like

Just created a fork of the original repository, added the missing file handle closing and the steps to reproduce at the beginning of the README: GitHub - klaas/VaporUploads: Demonstrating uploads in Vapor 4. Particularly large streaming uploads.

No, not that I could find anything. But this is not my application. It's a sample from @mcritz .

Thanks so much @klaas for your amazing reproduction.

The problem that's going on here is that whilst NonBlockingFileIO is writing to a file descriptor, another thread closes the same file descriptor. In other words, a fileIO.write has been started, and before its returned future completes, another thread calls close on it.

The close that's not ordered correctly after the write is coming from closure #1 in Request.Body.drain which calls StreamController.upload(req:)

  0    170                      close:entry close(16)
              libsystem_kernel.dylib`close+0xa
              Run`static Posix.close(descriptor:)+0x57
              Run`closure #1 in NIOFileHandle.close()+0x19
              Run`thunk for @callee_guaranteed (@unowned Int32) -> (@error @owned Error)+0xf
              Run`partial apply for thunk for @callee_guaranteed (@unowned Int32) -> (@error @owned Error)+0x14
              Run`NIOFileHandle.withUnsafeFileDescriptor<A>(_:)+0xe2
              Run`NIOFileHandle.close()+0x54
              Run`closure #1 in closure #1 in StreamController.upload(req:)+0x797
              Run`partial apply for closure #1 in closure #1 in StreamController.upload(req:)+0x38
              Run`closure #1 in Request.Body.drain(_:)+0x9d
              Run`Request.BodyStream.write(_:promise:)+0x21f
              Run`protocol witness for BodyStreamWriter.write(_:promise:) in conformance Request.BodyStream+0x1d
              Run`BodyStreamWriter.write(_:)+0x1e1
              Run`HTTPServerRequestDecoder.write(_:to:context:)+0x15e
              Run`HTTPServerRequestDecoder.channelRead(context:data:)+0x1e78
              Run`protocol witness for _ChannelInboundHandler.channelRead(context:data:) in conformance HTTPServerRequestDecoder+0x9
              Run`ChannelHandlerContext.invokeChannelRead(_:)+0x14d
              Run`ChannelHandlerContext.fireChannelRead(_:)+0xc6
              Run`HTTPDecoder.didFinishMessage()+0x366
              Run`protocol witness for HTTPDecoderDelegate.didFinishMessage() in conformance HTTPDecoder<A, B>+0x10

So either body.drain or the StreamController has quite a severe bug. CC @tanner0101/@mcritz.


Here's the dtrace I used to debug this:

sudo dtrace -n 'syscall::close:entry / pid == $target / { printf("%s(%d)", probefunc, arg0); ustack(); } syscall::close:return / pid == $target / { printf("%d", errno); ustack(); } syscall::write:entry / pid == $target / { printf("%s(%d)", probefunc, arg0); } syscall::write:return / pid == $target / { printf("%d", errno); ustack(); } ' -p $(pgrep Run)

and this is the relevant part of the output

  0    170                      close:entry close(16)
              libsystem_kernel.dylib`close+0xa
              Run`static Posix.close(descriptor:)+0x57
              Run`closure #1 in NIOFileHandle.close()+0x19
              Run`thunk for @callee_guaranteed (@unowned Int32) -> (@error @owned Error)+0xf
              Run`partial apply for thunk for @callee_guaranteed (@unowned Int32) -> (@error @owned Error)+0x14
              Run`NIOFileHandle.withUnsafeFileDescriptor<A>(_:)+0xe2
              Run`NIOFileHandle.close()+0x54
              Run`closure #1 in closure #1 in StreamController.upload(req:)+0x797
              Run`partial apply for closure #1 in closure #1 in StreamController.upload(req:)+0x38
              Run`closure #1 in Request.Body.drain(_:)+0x9d
              Run`Request.BodyStream.write(_:promise:)+0x21f
              Run`protocol witness for BodyStreamWriter.write(_:promise:) in conformance Request.BodyStream+0x1d
              Run`BodyStreamWriter.write(_:)+0x1e1
              Run`HTTPServerRequestDecoder.write(_:to:context:)+0x15e
              Run`HTTPServerRequestDecoder.channelRead(context:data:)+0x1e78
              Run`protocol witness for _ChannelInboundHandler.channelRead(context:data:) in conformance HTTPServerRequestDecoder+0x9
              Run`ChannelHandlerContext.invokeChannelRead(_:)+0x14d
              Run`ChannelHandlerContext.fireChannelRead(_:)+0xc6
              Run`HTTPDecoder.didFinishMessage()+0x366
              Run`protocol witness for HTTPDecoderDelegate.didFinishMessage() in conformance HTTPDecoder<A, B>+0x10

  2    166                      write:entry write(16)
  2    167                     write:return 9
              libsystem_kernel.dylib`write+0xa
              Run`closure #1 in static Posix.write(descriptor:pointer:size:)+0x6c
              Run`thunk for @callee_guaranteed () -> (@unowned Int, @error @owned Error)+0x13
              Run`partial apply for thunk for @callee_guaranteed () -> (@unowned Int, @error @owned Error)+0x14
              Run`syscall<A>(blocking:where:_:)+0x1a2
              Run`static Posix.write(descriptor:pointer:size:)+0xfe
              Run`closure #1 in closure #1 in closure #1 in NonBlockingFileIO.write0(fileHandle:toOffset:buffer:eventLoop:)+0x4a9
              Run`partial apply for closure #1 in closure #1 in closure #1 in NonBlockingFileIO.write0(fileHandle:toOffset:buffer:eventLoop:)+0x2b
              Run`thunk for @callee_guaranteed (@unowned Int32) -> (@unowned IOResult<Int>, @error @owned Error)+0x13
              Run`partial apply for thunk for @callee_guaranteed (@unowned Int32) -> (@unowned IOResult<Int>, @error @owned Error)+0x14
              Run`NIOFileHandle.withUnsafeFileDescriptor<A>(_:)+0xe2
              Run`closure #1 in closure #1 in NonBlockingFileIO.write0(fileHandle:toOffset:buffer:eventLoop:)+0x310
              Run`partial apply for closure #1 in closure #1 in NonBlockingFileIO.write0(fileHandle:toOffset:buffer:eventLoop:)+0x27
              Run`closure #1 in ByteBuffer.readWithUnsafeReadableBytes(_:)+0x2a
              Run`partial apply for closure #1 in ByteBuffer.readWithUnsafeReadableBytes(_:)+0x14
              Run`thunk for @callee_guaranteed (@unowned UnsafeRawBufferPointer) -> (@unowned Int, @error @owned Error)+0x13
              Run`partial apply for thunk for @callee_guaranteed (@unowned UnsafeRawBufferPointer) -> (@unowned Int, @error @owned Error)+0x14
              Run`ByteBuffer.withUnsafeReadableBytes<A>(_:)+0x3c3
              Run`ByteBuffer.readWithUnsafeReadableBytes(_:)+0x131
              Run`closure #1 in NonBlockingFileIO.write0(fileHandle:toOffset:buffer:eventLoop:)+0x2b3

Whilst SwiftNIO's docs clearly point out (see the warning) that NIOFileHandle automagically thread-safe, SwiftNIO could possibly store some extra state that would make debugging this kind of threading issue a bit easier.

2 Likes