Those 2 are quite similar, so lets try to tackle it. There are a few gotchas in there, so I think that it would be an excellent example in the documentation. Though, I believe that @tera "first subprocess to write in an async/await manner and what API could be used for that" is not an Subprocess.run
area of responsibility, so I will not mention it here.
I will use grep -o "Elizabeth" "Pride and Prejudice.txt" | wc -l
as an example. This should count the number of times that the word "Elizabeth" appears in "Pride and Prejudice.txt" (or something like this, whatever…).
Option 1: Naive - be carefull!
let pipe = try FileDescriptor.pipe()
let grep = try await Subprocess.run(
executing: .at(FilePath("\(bin)/grep")),
arguments: ["-o", "Elizabeth", "Pride and Prejudice.txt"],
output: .writeTo(pipe.writeEnd, closeWhenDone: true)
)
let wc = try await Subprocess.run(
executing: .at(FilePath("\(bin)/wc")),
arguments: ["-l"],
input: .readFrom(pipe.readEnd, closeWhenDone: true)
)
let result = wc.standardOutput.map { String(decoding: $0, as: UTF8.self) }
print(result ?? "?")
This works, it will print "645".
Deadlock 1: wc
starts only after the grep
finished. If grep
fills the pipe it will deadlock, as nobody is reading it. This is the cat "Pride and Prejudice.txt"
all over again.
Deadlock 2: If we change the grep
to:
// Change the 'grep', everything else is the same.
let grep = try await Subprocess.run(
executing: .at(FilePath("\(bin)/grep")),
arguments: ["-o", "Elizabeth", "Pride and Prejudice.txt"],
output: .writeTo(pipe.writeEnd, closeWhenDone: false) // < ----- HERE
)
I just changed closeWhenDone: true
to false
(user mistake). Because pipe.writeEnd
never gets closed then wc
cannot finish, as it is possible that somebody will write something in there. This would be equivalent to:
let pipe = try FileDescriptor.pipe()
// Write something to 'pipe.writeEnd', but DO NOT close it.
let wc = try await Subprocess.run(
executing: .at(FilePath("\(bin)/wc")),
arguments: ["-l"],
input: .readFrom(pipe.readEnd, closeWhenDone: true)
)
// (Code here never executes.)
Side-note: 1 week ago this would always deadlock because the PR used posix_spawn_file_actions_adddup2
to send the file to a child without closing it in the parent process. Even if the child closed the file, it would still be open in the parent resulting in an infinite loop. The rule is: as long as there is writeEnd
open then the read
will never return 0 (read() == 0
means end of file).
This was fixed, so it should not matter. But what matters is that: since we close the parent writeEnd
and the child also closes its end then it is possible to get the read() == 0
. If I'm correct it would be possible to redefine CollectedOutputMethod.collect
from:
Collect the output as Data with the default 16kb limit. Possible deadlock.
To:
Collect all of the Data. Never deadlock.
Personally I like the new definition, so much that I would make it a default argument for both stdout/stderr
.
Option 2: Nested compose - suboptimal
Now we know that we have to run both of the processes at the same time. We can either nest or withTaskGroup
. Let's start with nesting.
let runResult = try await Subprocess.run(
executing: .at(FilePath("\(bin)/grep")),
arguments: ["-o", "Elizabeth", "Pride and Prejudice.txt"],
output: .redirect
) { p in
return try await Subprocess.run(
executing: .at(FilePath("\(bin)/wc")),
arguments: ["-l"],
input: p.standardOutput! // YOLO. It's not like I can handle 'nil' meaningfully anyway.
)
}
let result = String(decoding: runResult.value.standardOutput!, as: UTF8.self)
print(result)
This will work. As far as I know it will not deadlock. It looks pretty, because we used input: p.standardOutput!
for the wc
. It composes! In practice this is a bit sub-optimal. To see why let's do a tiny renaming:
let runResult = try await Subprocess.run(
executing: .at(FilePath("\(bin)/grep")),
arguments: ["-o", "Elizabeth", "Pride and Prejudice.txt"],
output: .writeToParentProcess // <---- HERE, it was 'redirect'
) { p in
// Same as above…
}
I renamed RedirectedOutputMethod.redirect
to .writeToParentProcess
to make it more clear. What we are doing is:
- CHILD-1 writes to pipe
- PARENT reads CHILD-1 pipe in
AsyncBytes
- PARENT writes to CHILD-2 pipe
- CHILD-2 reads the pipe
You can validate this by modifying the AsyncBytes.Iterator.next
method to print every time it is called.
I'm not exactly sure if I like the Subprocess.output
being composable with Subprocess.input
(via AsyncBytes
). But it will work. And, as far as I can tell it should not deadlock unless we apply some additional buffering in AsyncBytes
and there are some VERY specific requirements. Then: maybe.
Bike shedding: I'm not a fan of the redirect
name. For me it fails in the following ways:
- redirect where?
- what do I gain from redirect?
If we renamed it as [write/pipe]To[Parent/Current]Process
then I think it would be more clear what is happening:
- where do we redirect? To
[Parent/Current]Process
. - what do I gain from redirect? We can interact with it in the current process.
public struct RedirectedOutputMethod: Sendable, Hashable {
public static var discard: Self
public static var writeToParentProcess: Self
public static func writeToFile(_ fd: FileDescriptor, close: Bool) -> Self
}
We can:
- rename
noInput
tonone
- this would beSubprocess.run(…, input: .none)
for the callers - make it
NilLiteralConvertible
- leave as it is
No strong opinion here, I don't like bike shedding.
Option 3: Nested with pipe - works
Same as above, but we create a pipe to connect the children.
let pipe = try FileDescriptor.pipe()
let runResult = try await Subprocess.run(
executing: .at(FilePath("\(bin)/grep")),
arguments: ["-o", "Elizabeth", "Pride and Prejudice.txt"],
output: .writeTo(pipe.writeEnd, closeWhenDone: true)
) { p in
return try await Subprocess.run(
executing: .at(FilePath("\(bin)/wc")),
arguments: ["-l"],
input: .readFrom(pipe.readEnd, closeWhenDone: true)
)
}
let result = String(decoding: runResult.value.standardOutput!, as: UTF8.self)
print(result)
It works.
Option 4: Task group
let result = try await withThrowingTaskGroup(of: String.self, returning: String.self) { group in
let pipe = try FileDescriptor.pipe()
group.addTask {
try await Subprocess.run(
executing: .at(FilePath("\(bin)/grep")),
arguments: ["-o", "Elizabeth", "Pride and Prejudice.txt"],
output: .writeTo(pipe.writeEnd, closeWhenDone: true)
)
return ""
}
group.addTask {
let r = try await Subprocess.run(
executing: .at(FilePath("\(bin)/wc")),
arguments: ["-l"],
input: .readFrom(pipe.readEnd, closeWhenDone: true)
)
return String(decoding: r.standardOutput!, as: UTF8.self)
}
var result = ""
for try await partial in group {
if !partial.isEmpty {
result = partial
}
}
return result
}
print(result)
It works. It can be simplified, but whatever…