Pipe Child Processes Together

Thank you. No seriously. Thank you. This is exactly what I was looking for.

So the thing that I was really looking to do was capture the output from a large binary process, something that could be hundreds of GB in size, and long running process (e.g., zfs send) and pipe that to another process (e.g., gpg2 --encrypt) so I could then redirect the output of the second process to a file.

Because of the large size of the first process this was something that could not waitUntilExit. So I've modified your code to make use of DispatchIO to watch for information on the file descriptor of standardOutput of pipe2.

I wanted to share the modifications in case anyone has any pointers. Or was just interested in the code.

import Dispatch
import Foundation

let pipe1 = Pipe()
let pipe2 = Pipe()

let task1 = Process()
task1.launchPath = "/bin/cat"
task1.arguments = ["/dev/zero"]
task1.standardOutput = pipe1

let task2 = Process()
task2.launchPath = "/usr/bin/head"
task2.arguments = ["-c", "100000001"] // 100MB?
task2.standardInput = pipe1
task2.standardOutput = pipe2

let global = DispatchQueue.global(qos: .background)
let channel = DispatchIO(type: .stream, fileDescriptor: pipe2.fileHandleForReading.fileDescriptor, queue: global) { (int) in
    DispatchQueue.main.async {
        print("Clean-up Handler: \(int)")
        exit(EXIT_SUCCESS)
    }
}

channel.setLimit(highWater: Int(PAGE_SIZE))

var bytes = 0
channel.read(offset: 0, length: Int.max, queue: global) { (closed, dispatchData, error) in
    if let data = dispatchData, !data.isEmpty {
        DispatchQueue.main.async {
            bytes += data.count
            print("Got data! \(data.count) of \(bytes)")
        }
    }

    if closed {
        channel.close()
    }
}

task1.launch()
task2.launch()

RunLoop.main.run()

This is quick-n-dirty so I do not expect this to be the "final" code. There are probably some threading issues or reference cycles in here but I thought I'd at least share it.

Unfortunately it does not seem to work on Linux it gets stuck on the last 257 bytes (e.g., Got data! 4096 of 99999744). It is not inconceivable that this is a bug in the implementation of read on Linux where it does not trigger when channel is less than highWater.

4 Likes