Pipe Child Processes Together

How would one go about piping the output of one child Process into another? Is this something that Foundation supports?

// Pseudo ...
let a = Process()
let b = Process()

b.standardInput = a.standardOutput

Is this a situation where I will need to get down to the posix_spawnp and handle all of this myself?

Yes, it is possible with Pipe. Here is an example for the command pwd | rev which prints the current working directory in reverse:

import Foundation

let pipe1 = Pipe()
let pipe2 = Pipe()

let task1 = Process()
task1.launchPath = "/bin/pwd"
task1.standardOutput = pipe1

let task2 = Process()
task2.launchPath = "/usr/bin/rev"
task2.standardInput = pipe1
task2.standardOutput = pipe2

task1.launch()
task2.launch()
task1.waitUntilExit()
task2.waitUntilExit()

let data = pipe2.fileHandleForReading.readDataToEndOfFile()
if let output = String(data: data, encoding: .utf8) {
    print(output)
}
5 Likes

Thank you. No seriously. Thank you. This is exactly what I was looking for.

So the thing that I was really looking to do was capture the output from a large binary process, something that could be hundreds of GB in size, and long running process (e.g., zfs send) and pipe that to another process (e.g., gpg2 --encrypt) so I could then redirect the output of the second process to a file.

Because of the large size of the first process this was something that could not waitUntilExit. So I've modified your code to make use of DispatchIO to watch for information on the file descriptor of standardOutput of pipe2.

I wanted to share the modifications in case anyone has any pointers. Or was just interested in the code.

import Dispatch
import Foundation

let pipe1 = Pipe()
let pipe2 = Pipe()

let task1 = Process()
task1.launchPath = "/bin/cat"
task1.arguments = ["/dev/zero"]
task1.standardOutput = pipe1

let task2 = Process()
task2.launchPath = "/usr/bin/head"
task2.arguments = ["-c", "100000001"] // 100MB?
task2.standardInput = pipe1
task2.standardOutput = pipe2

let global = DispatchQueue.global(qos: .background)
let channel = DispatchIO(type: .stream, fileDescriptor: pipe2.fileHandleForReading.fileDescriptor, queue: global) { (int) in
    DispatchQueue.main.async {
        print("Clean-up Handler: \(int)")
        exit(EXIT_SUCCESS)
    }
}

channel.setLimit(highWater: Int(PAGE_SIZE))

var bytes = 0
channel.read(offset: 0, length: Int.max, queue: global) { (closed, dispatchData, error) in
    if let data = dispatchData, !data.isEmpty {
        DispatchQueue.main.async {
            bytes += data.count
            print("Got data! \(data.count) of \(bytes)")
        }
    }

    if closed {
        channel.close()
    }
}

task1.launch()
task2.launch()

RunLoop.main.run()

This is quick-n-dirty so I do not expect this to be the "final" code. There are probably some threading issues or reference cycles in here but I thought I'd at least share it.

Unfortunately it does not seem to work on Linux it gets stuck on the last 257 bytes (e.g., Got data! 4096 of 99999744). It is not inconceivable that this is a bug in the implementation of read on Linux where it does not trigger when channel is less than highWater.

4 Likes

Yeah I’ve noticed issues with Pipe and Process on Linux as well.

Are you referring to SR-5773?

It kind of seems like exactly this issue.

Not exactly, when I filed that bug I was just using DispatchIO. The error I was seeing with Pipe and Process was where trying to read from a pipe, like in my crappy Discord bot, works in macOS, but fails consistently on Linux.

Maybe they are related to some underlying issue though.

Maybe I am misunderstanding you but I'm using Pipe, Process and DispatchIO too.

...
let global = DispatchQueue.global(qos: .background)
let channel = DispatchIO(type: .stream, fileDescriptor: pipe2.fileHandleForReading.fileDescriptor, queue: global) { (int) in
    DispatchQueue.main.async {
        print("Clean-up Handler: \(int)")
        exit(EXIT_SUCCESS)
    }
}
...

channel.read(offset: 0, length: Int.max, queue: global) { (closed, dispatchData, error) in
    ...
}
...

AFAICT they are identical.

Oh yes, when talking about your use case yes, they look like the same issue. My earlier comment in this thread was about the example I linked that doesn't use DispatchIO. Sorry for the confusion.

I think I've fixed the underlying issue in libdispatch. It has resolved the issue on my machine at least. I've now run the code above with the patched libdispatch and it has run without fail >10,000 in a row. Without the patch it would fail about 3% of the time.

I've attached a patch file to the bug SR-5773. I'll be making a PR against swift-corelibs-libdispatch shortly. I have made a PR against swift-4.2-branch.

Just wanted to provide an update in case anyone was following this.

3 Likes

I was just stumbling into this same issue, wanting to read data off a long running process in Swift/Linux, and found this thread. The PR seems to have hit a bit of a roadblock and the bug is still open, but I'm not familiar enough with the underlying code to understand what the reviewer was suggesting was a better solution to the issue.

The issue here is actually [SR-9033] Dispatch spins in a tight loop when receiving EPOLLHUP · Issue #644 · apple/swift-corelibs-libdispatch · GitHub . The proposed PR is actually not addressing the real issue but hides it (but it creates a 100% CPU spin as Pierre points out on the PR).

SR-9033 is actually a serious bug in Dispatch on Linux that will affect anything that can create EPOLLHUPs, namely UNIX pipes, UNIX domain sockets, as well as TCP sockets :frowning:.

2 Likes

Thank you, I appreciate the insight!

People keep saying that it causes 100% CPU but my custom Foundation with that patch in it has been working for the better part of a year to stream my ZFS backups to the cloud. My server never gets above 10% CPU usage while it does it. :man_shrugging:

I'd still love to be able to fix the problem and get it mainlined. I really wish I could just use the community Docker instance instead of rolling my own. I just don't know what else to do at this point. If someone wants to help me understand further I'd love to get it done.

The proper fix is now in and it's fixed on master and will be shipped in Swift 5.0.2.

Awesome! I’ll go ahead and close the MR then. Thanks!