How to use NonBlockingFileIO for repeated writes

@johannesweiss What prevents the coordinator in https://github.com/apple/swift-nio-examples/blob/master/backpressure-file-io-channel/Sources/FileIOChannelWriteCoordinator.swift from doing the same? Or perhaps in more correct terms, the class using the coordinator from doing the same.

I'm probably not understanding all of it, but to me it looks like

  1. an incoming event could close a file handle
  2. while an ongoing file write would complete with "file already closed" error
  3. which causes the state machine to proceed to closing all resources, thus double closing the handle and causing a crash

So, as you described earlier, a file would be written to at the same time an incoming event signals a close or error of some kind.

That's a great question! The real answer is that there was a bug in the logic which is now fixed. I noticed that the code got copied and I thought it's important to give examples on how to test state machines anyway (and how easy that is). The bug is now fixed, the state machine now has 100% coverage once this PR is in.

The first version of the state machine did track the resources correctly, so it would've never double closed anything, however, there was one bug: Whilst a write was ongoing (state .writing), it used to transition straight to .error emitting the .processingCompletedDiscardResources Action. That's incorrect, whilst a write is ongoing, we need to sit on the resources, wait until the write completes and then issue that Action.

Remember, just because something's a state machine doesn't make it more correct. The benefits are that tests are easy, debugging is much much easier (all deterministic). The fix I needed to do was just

    internal mutating func didError(_ error: Error) -> Action {
         let oldState = self.state
         self.state = .error(error)
         switch oldState {
-        case .idle, .error:
+        case .idle, .error, .errorWhilstWriting:
             return Action(main: .nothingWeAreWaiting /* for a new request / the channel to go away */, callRead: false)
         case .openingFile:
             return Action(main: .processingCompletedDiscardResources(nil, error), callRead: false)
-        case .readyToWrite(let fileHandle, _), .writing(let fileHandle, _):
+        case .readyToWrite(let fileHandle, _):
             return Action(main: .processingCompletedDiscardResources(fileHandle, error), callRead: false)
+        case .writing(let fileHandle, _):
+            self.state = .errorWhilstWriting(fileHandle, error)
+            return Action(main: .nothingWeAreWaiting /* for the write to complete */, callRead: false)
         }
     }

and the corresponding test was also very straightforward:

    func testErrorArrivesWhilstWriting() {
        XCTAssertTrue(self.coordinator.shouldWeReadMoreDataFromNetwork())
        self.coordinator.didReceiveRequestBegin(targetPath: "/test")
            .assertOpenFile({ path in
                XCTAssertEqual("/test", path)
            })
            .assertDoNotCallRead() // We're waiting for the file to open, so we better don't buffer more.
        XCTAssertFalse(self.coordinator.shouldWeReadMoreDataFromNetwork())
        self.coordinator.didOpenTargetFile(self.fakeFileHandle)
            .assertCallRead() // Now, we want the body bytes
        XCTAssertTrue(self.coordinator.shouldWeReadMoreDataFromNetwork())
        self.coordinator.didReceiveRequestBodyBytes(self.byteX)
            .assertStartWriting()
            .assertDoNotCallRead() // We're now processing them, so let's wait again

        // Now, we're currently `.writing`, so let's inject the error.
        self.coordinator.didError(DummyError())
            .assertNothing() // We should be told to do nothing and sit tight.
            .assertDoNotCallRead()

        // But now, we're done writing which should surface the error:
        self.coordinator.didFinishWritingOneChunkToFile()
            .assertDiscardResources({ fileHandle, error in
                XCTAssertNotNil(fileHandle)
                XCTAssert(error is DummyError)
            })
    }

Hope that helps :slight_smile:.

Why does one have to setup a new NonBlockingFileIO

Thanks for catching this. Updated on gh.

1 Like

I've just released Release Refactor request body streaming state handling ยท vapor/vapor ยท GitHub which addresses the Precondition failed: blacklisted errno 9 Bad file descriptor error. Request's body.drain method will now wait until the previously returned future has completed before calling the handler again. This means that simply returning the file io write future in the drain handler will now result in correct behavior.

I've also included a streaming file upload example in the Development module: vapor/routes.swift at main ยท vapor/vapor ยท GitHub

Copying it here for posterity:

app.on(.POST, "upload", body: .stream) { req -> EventLoopFuture<HTTPStatus> in
    return req.application.fileio.openFile(
        path: "/Users/tanner/Desktop/foo.txt",
        mode: .write,
        flags: .allowFileCreation(),
        eventLoop: req.eventLoop
    ).flatMap { fileHandle in
        let promise = req.eventLoop.makePromise(of: HTTPStatus.self)
        req.body.drain { part in
            switch part {
            case .buffer(let buffer):
                return req.application.fileio.write(
                    fileHandle: fileHandle,
                    buffer: buffer,
                    eventLoop: req.eventLoop
                )
            case .error(let error):
                promise.fail(error)
                try! fileHandle.close()
                return req.eventLoop.makeSucceededFuture(())
            case .end:
                promise.succeed(.ok)
                try! fileHandle.close()
                return req.eventLoop.makeSucceededFuture(())
            }
        }
        return promise.futureResult
    }
}

Thanks to @johannesweiss for sharing swift-nio-examples/backpressure-file-io-channel at main ยท apple/swift-nio-examples ยท GitHub which I referenced heavily for Vapor's updated body stream state machine.

3 Likes

Sorry for the late response, but thank you very much for this improvement.

Hi, I've been playing with this API and have found the example code provided fails in many cases because the close() calls happen before the write calls, this amended code seems to work more often:

app.on(.POST, "upload", body: .stream) { req -> EventLoopFuture<HTTPStatus> in
    return req.application.fileio.openFile(
        path: "/Users/tanner/Desktop/foo.txt",
        mode: .write,
        flags: .allowFileCreation(),
        eventLoop: req.eventLoop
    ).flatMap { fileHandle in
        let promise = req.eventLoop.makePromise(of: HTTPStatus.self)
        req.body.drain { part in
            switch part {
            case .buffer(let buffer):
                return req.application.fileio.write(
                    fileHandle: fileHandle,
                    buffer: buffer,
                    eventLoop: req.eventLoop
                )
            case .error(let error):
                return req.eventLoop.submit {
                    promise.fail(error)
                    try fileHandle.close()
                }
            case .end:
                return req.eventLoop.submit {
                    promise.succeed(.ok)
                    try fileHandle.close()
                }
            }
        }
        return promise.futureResult
    }
}

However my test cases suggest that there are other bugs in drain that I am now investigating.

Edit: yeah I think drain has issues and this above code only works due to coincidence. drain is not waiting on the promises. Thus the only recommended way to use drain and NIO write currently is to wait on all the write futures before closing the handle.