Thank you. No seriously. Thank you. This is exactly what I was looking for.
So the thing that I was really looking to do was capture the output from a large binary process, something that could be hundreds of GB in size, and long running process (e.g., zfs send) and pipe that to another process (e.g., gpg2 --encrypt) so I could then redirect the output of the second process to a file.
Because of the large size of the first process this was something that could not waitUntilExit. So I've modified your code to make use of DispatchIO to watch for information on the file descriptor of standardOutput of pipe2.
I wanted to share the modifications in case anyone has any pointers. Or was just interested in the code.
import Dispatch
import Foundation
let pipe1 = Pipe()
let pipe2 = Pipe()
let task1 = Process()
task1.launchPath = "/bin/cat"
task1.arguments = ["/dev/zero"]
task1.standardOutput = pipe1
let task2 = Process()
task2.launchPath = "/usr/bin/head"
task2.arguments = ["-c", "100000001"] // 100MB?
task2.standardInput = pipe1
task2.standardOutput = pipe2
let global = DispatchQueue.global(qos: .background)
let channel = DispatchIO(type: .stream, fileDescriptor: pipe2.fileHandleForReading.fileDescriptor, queue: global) { (int) in
DispatchQueue.main.async {
print("Clean-up Handler: \(int)")
exit(EXIT_SUCCESS)
}
}
channel.setLimit(highWater: Int(PAGE_SIZE))
var bytes = 0
channel.read(offset: 0, length: Int.max, queue: global) { (closed, dispatchData, error) in
if let data = dispatchData, !data.isEmpty {
DispatchQueue.main.async {
bytes += data.count
print("Got data! \(data.count) of \(bytes)")
}
}
if closed {
channel.close()
}
}
task1.launch()
task2.launch()
RunLoop.main.run()
This is quick-n-dirty so I do not expect this to be the "final" code. There are probably some threading issues or reference cycles in here but I thought I'd at least share it.
Unfortunately it does not seem to work on Linux it gets stuck on the last 257 bytes (e.g., Got data! 4096 of 99999744). It is not inconceivable that this is a bug in the implementation of read on Linux where it does not trigger when channel is less than highWater.
Not exactly, when I filed that bug I was just using DispatchIO. The error I was seeing with Pipe and Process was where trying to read from a pipe, like in my crappy Discord bot, works in macOS, but fails consistently on Linux.
Oh yes, when talking about your use case yes, they look like the same issue. My earlier comment in this thread was about the example I linked that doesn't use DispatchIO. Sorry for the confusion.
I think I've fixed the underlying issue in libdispatch. It has resolved the issue on my machine at least. I've now run the code above with the patched libdispatch and it has run without fail >10,000 in a row. Without the patch it would fail about 3% of the time.
I was just stumbling into this same issue, wanting to read data off a long running process in Swift/Linux, and found this thread. The PR seems to have hit a bit of a roadblock and the bug is still open, but I'm not familiar enough with the underlying code to understand what the reviewer was suggesting was a better solution to the issue.
SR-9033 is actually a serious bug in Dispatch on Linux that will affect anything that can create EPOLLHUPs, namely UNIX pipes, UNIX domain sockets, as well as TCP sockets .
People keep saying that it causes 100% CPU but my custom Foundation with that patch in it has been working for the better part of a year to stream my ZFS backups to the cloud. My server never gets above 10% CPU usage while it does it.
I'd still love to be able to fix the problem and get it mainlined. I really wish I could just use the community Docker instance instead of rolling my own. I just don't know what else to do at this point. If someone wants to help me understand further I'd love to get it done.