[Pitch] Swift Subprocess

(This post is quite long, you may want to read it on Github.)

I don't use Swift Concurrency in day-to-day work and I'm not exactly familiar with fork internals (I always used wrappers), but for fun I wrote this test repo. This is basically the Process from the old Foundation upgraded to async/await. It is very similar to Python asyncio.subprocess. I will refer to some parts of the code later, so just a gist how it works:

let process = try Subprocess(
  executablePath: "/usr/bin/cat",
  arguments: ["Pride and Prejudice.txt"],
  stdout: .pipe
)

let s = try await process.stdout.readAll(encoding: .utf8)
print(s ?? "<decoding-failed>")

// In 1 expression:
let s2 = try await Subprocess(…).stdout.readAll(encoding: .utf8)

This will start the process, read the whole stdout, waitpid it (in the background) and then close the files. More interesting example would be:

let process = try Subprocess(executablePath: "/usr/bin/sleep", arguments: ["3"])

Task.detached {
  try await Task.sleep(nanoseconds: 1 * second)
  try await process.kill()
}

let status = try await process.wait()
assert(status == -9)

wait is just a synchronization point. Cancelling a Task that waits only cancels this wait and does nothing to the process. You can use process.terminateAfter(body:) if you need scoped lifetime - this is exactly the same thing as fileDescriptor.closeAfter(body:). More examples available here.

Now let's go back to the proposal.

waitpid

The proposal does not exactly specify how waitpid happens, so the only thing that I have is the proof of concept:

extension Subprocess.Configuration {
  public func run<R>(…) async throws -> Result<R> {
    // (…)
    return try await withThrowingTaskGroup(of: RunState<R>.self) { group in
      @Sendable func cleanup() throws { /* Close files */ }

      group.addTask { waitpid(pid.value, &status, 0) }
      group.addTask {
        // Call the closure provided by the user. I removed uninteresting stuff.
        try await body()
        try cleanup()
      }

      while let state = try await group.next() {
        // Get the 'result' and 'terminationStatus'
      }
    }
  }
}

We have a blocking waitpid call on the cooperative thread. Tbh. even from the proof of concept I expected at least waitpid(WNOHANG) + Task.sleep() in a loop. I tried to start a few ffmpeg processes, and it didn't work.

Can the proposal at least mention that this is not how it will work in the real implementation?

I outlined a few solutions here with a link to CPython implementation.

IO - Chatty children

Imagine this scenario:

  1. Child process starts with stdout set to pipe
  2. Child writes to stdout
  3. Child writes some more…
  4. Deadlock

How? Pipes are backed by a buffer, if nobody reads the buffer then write will wait until somebody does. This will deadlock with the waitpid and no reader.

The proposal does not mention this situation.

Though if we take the code from the proof of concept at the face value then as soon as the body finishes we will close the reading end of the pipe. This will SIGPIPE or EPIPE in the child. Well… you can't deadlock if you crash it first. Is that intended?

A more "human" approach would be to read all of the nonsense that the child writes and then waitpid. Or do it in parallel: 1 Task reads and the 2nd waits.

Python documentation explicitly warns about this situation for wait, that's why they recommend communicate for pipes.

Examples using the code from my repository (you can easily translate it to Python with Popen):

  • :red_circle: deadlock - pipe is full and the child is blocked

    let process = try Subprocess(
      executablePath: "/usr/bin/cat",
      arguments: ["Pride and Prejudice.txt"],
      stdout: .pipe,
    )
    
    let status = try await process.wait() // Deadlock
    
  • :green_circle: works - we read the whole stdout before waiting

    let process = try Subprocess( /* cat "Pride and Prejudice.txt" */ )
    let s = try await process.stdout.readAll(encoding: .utf8)
    let status = try await process.wait()
    
  • :green_circle: works - it reads stdout/stderr in parallel, so we do not deadlock on any of them

    let process = try Subprocess( /* cat "Pride and Prejudice.txt" */ )
    let status = try await process.readAllFromPipesAndWait() // 'communicate' in Python
    

IO - Pipe buffer size

On Linux you can use fcntl with following arguments:

  • F_GETPIPE_SZ - get pipe buffer size
  • F_SETPIPE_SZ - set pipe buffer size; conditions apply

It may not be portable.

On my machine (Ubuntu 22.04.4 LTS) the default is 65536. This is a lot, but still not enough to store the whole "Pride and Prejudice". You can just cat the whole thing for testing.

In theory we could allow users to specify the size of the buffer. I do not know the use case for this, and I'm not sure if anybody will ever complain if we don't. Long time ago I discovered that the more advanced features you use, the more things break. Anyway, if we want this then remember to ignore EBUSY.

I played with pipes here to discover how they work, so you may want to check this out. Linux only.

IO - Blocking

In "IO - Chatty children" I promised that it will not deadlock. I lied.

Tbh. I do not understand everything in AsyncBytes, but:

buffer.readFunction = { (buf) in
  // …
  let bufPtr = UnsafeMutableRawBufferPointer(start: buf.nextPointer, count: capacity)
  let readSize: Int = try await IOActor().read(from: file, into: bufPtr)
}

final actor IOActor {
  func read(from fd: Int32, into buffer: UnsafeMutableRawBufferPointer) async throws -> Int {
    while true {
      let amount = Darwin.read(fd, buffer.baseAddress, buffer.count)
      // …
    }
  }

  func read(from fileDescriptor: FileDescriptor, into buffer: UnsafeMutableRawBufferPointer) async throws -> Int {
    // this is not incredibly effecient but it is the best we have
    let readLength = try fileDescriptor.read(into: buffer)
    return readLength
  }
}

I'm not exactly sure what try await IOActor().read(from: file, into: bufPtr) does. It does not matter, I will just skip it.

Anyway, we have fileDescriptor.read and Darwin.read on cooperative threads. This method blocks. So currently we have:

  • Task1 does waitpid
  • Task2 waits for read

But when we read something it will unblock. Surely it will not deadlock. Right?

Right? :sob:

Right? :sob::sob:

  • Task1 does waitpid
  • Task2 waits for read
  • Process writes to stderr, fills the buffer and waits for somebody to read it

We are down 2 cooperative threads and we have a deadlock with the child process. Ouch…

Is there some prevention method for this? The proposal does not mention it. In the code we have "this is not incredibly effecient but it is the best we have" comment, but I do not know what it means.

A lot of people say that you should not block on the cooperative thread. I don't agree. I think you are perfectly fine with blocking (within reason…), and I would take it any day over over-engineered solution that does not work.

That said if we really wanted to over-engineer this:

  • Option A: Threads - spin a thread for every read, then resume the Continuation. On cancellation (which we have to support!) just kill the thread.

    This is actually how waitpid works in my library. You start a process, I create a waitpid thread. Once the child terminates it sends the message to Subprocess actor which closes the files and resumes wait continuations (btw. they support cancellation, this is not as trivial as one may think).

    Threads are expensive, but in theory you can just have 1 thread for all IO and just poll the files. You can even monitor termination with Linux pid file descriptors, which gives us 1 thread for all of the Subprocess needs.

  • Option B: Non blocking IO - this is what I do for IO. For files you open them with O_NONBLOCK, for pipes you set it later (btw. there is race condition in there).

    Reads from an empty pipe return EWOULDBLOCK/EAGAIN. In my implementation I just return nil as readByteCount, so that the user knows that nothing happened.

    If they really need to read something then they can use poor-person synchronization and do Task.sleep. Tbh. this is exactly what I do when user calls readAll. Is this the best thing ever? Not. But it does not block. It is cooperative. And it supports cancellation.

    Blocking writes also return EWOULDBLOCK/EAGAIN, but it is a little bit more trickier, so read the POSIX docs. Writes n > PIPE_BUF may drop user data. This is what the spec says, but it also list the solution:

    • we can implement it on our side, but this is OS specific; on Linux you just split in PIPE_BUF chunks
    • or just go with Jaws/TopGear logic and let the users create a bigger pipe
    • or just mention O_NONBLOCK in documentation, and let the users deal with it

At this point we arrive to (unsurprising) conclusion: Subprocess should be 100% Swift concurrency neutral: it does not block and it respects cancellation. The library that I wrote archives this in following ways:

  • waitpid is done in a separate thread:
    • when the process finishes naturally -> OS notifies the waitpid thread
    • SIGKILL -> our code sends signal -> child terminates -> OS notifies the waitpid thread
  • all IO is done with O_NONBLOCK

IO - posix_spawn_file_actions_adddup2

With this everything should… Not yet! We have an infinite loop!

public struct CollectedOutputMethod: Sendable, Hashable {
  // Collect the output as Data with the default 16kb limit
  public static var collect: Self
  // Collect the output as Data with modified limit
  public static func collect(limit limit: Int) -> Self
}

Why do I have to provide a limit (implicit 16kb or explicit)? I just want all! Is Int.max good for this? How about Int.min? -1? Tough luck: all of the will crash with SIGILL: illegal instruction operand.

Btw. what it the unit for limit? The proposal says: "By default, this limit is 16kb (when specifying .collect).". But in code we have return .init(method: .collected(128 * 1024)). Instead it should be limitInBytes or byteCount.

Anyway, you are miss-using the api. This works:

let (readEnd, writeEnd) = try FileDescriptor.pipe()
let buffer = UnsafeMutableRawBufferPointer.allocate(byteCount: 10, alignment: 1)
try writeEnd.close()

// When 'read' returns 0 it means end of the file.
while try readEnd.read(into: buffer) != 0 {
  // Code here never executes!
}

This is an infinite loop:

let (readEnd, writeEnd) = try FileDescriptor.pipe()
let buffer = UnsafeMutableRawBufferPointer.allocate(byteCount: 10, alignment: 1)
let writeEnd2 = dup(writeEnd.rawValue) // <-- I added this
try writeEnd.close()

while try readEnd.read(into: buffer) != 0 {
  // Infinite loop.
}

You are doing the 2nd one. With posix_spawn_file_actions_adddup2 you send the file to the child, but you also have it in the parent process. The rule is: as long as there is writeEnd open then the read will never return 0.

This combined with blocking reads means that our cooperative thread is down. But surely when the process exists it will unlock the thread? No. The writeEnd is closed after the body ends and we are inside the body. This is why everything should be async: still an infinite loop, but at least it is cooperative.

Anyway, you are supposed to close the child ends in the parent process. This is how in my implementation I can:

let s2 = try await Subprocess( /* cat "Pride and Prejudice.txt" */ ).stdout.readAll(encoding: .utf8)

It will read all, without any limits. We could allow users specify some limit, but it should be an option, not a mandatory api limitation.

IO - Default values

In proposal we have:

public static func run<R>(
  …,
  input: InputMethod = .noInput,
  output: RedirectedOutputMethod = .redirect, // <-- here
  error: RedirectedOutputMethod = .discard,
  _ body: (@Sendable @escaping (Subprocess) async throws -> R)
) async throws -> Result<R>

redirect will create pipe. If the user forgets to read it (or they are not interested) we may deadlock when pipe buffer becomes full. Isn't discard a better default value?

let example1 = try await Subprocess.run(
  executing: .named("ls")
) { … }

let example2 = try await Subprocess.run(
  executing: .named("ls"),
  output: .redirect
) { … }

In example2 you can clearly see that something is happening. The framework did not decide this for us. It was our own decision, and it is very visible to the readers.

Btw. discard writes to /dev/null. As far as I know you can get away with only 1 discard file with O_RDWR. This way you use less resources. May matter on servers.

AsyncBytes - Optional output props

public struct Subprocess: Sendable {
  public var standardOutput: AsyncBytes?
  public var standardError: AsyncBytes?
}

Those properties are only usable if we specified .redirect. Why does the positive path require optional unwrap with !?

try await Subprocess.run(
  executing: .named("ls"),
  output: .redirect
) { process in
  process.standardOutput!.<something>
}

This ! is not needed. AsyncBytes should store FileDescriptor? and throw EBADF (or precondition) if we try to use it when the argument was not .redirect.

AsyncBytes - Read buffer

public struct AsyncBytes: AsyncSequence, Sendable {
  public typealias Element = UInt8
  public typealias AsyncIterator = Iterator

  public func makeAsyncIterator() -> Iterator
}

The only thing we have here is AsyncIterator. What if I wanted to write a StringOutput type that deals with all of the String stuff in a Java style decorator fashion:

let stdout = StringOutput(process.standardOutput!, encoding: .utf8)

for await line in stdout.lines() {
  …
}

I'm sure everybody knows how StringOutput internals would look like.

Why do I need to read byte-by-byte? Can't I just read the whole buffer of data? This just forces me to write the code that undoes what AsyncBytes did. It applies an abstraction (including its own buffering) that I have not asked for. And now we need 2 buffers: AsyncBytes one and the one where I gather my String.

StandardInputWriter - Call finish?

For StandardInputWriter proposal states:

Note: Developers must call finish() when they have completed writing to signal that the standard input file descriptor should be closed.

Why?

We are basically racing the process against the user code (body argument).

  • process finishes 1st - reading end of the pipe is closed -> SIGPIPE/EPIPE, but this has nothing to do with closing the file
  • user code (body) finishes 1st - can't we close it after? If the process is still running then it will get values from the buffer, and eventually read(…) == 0

I guess in theory we could have a subprocess that reads from the stdin in a loop, so if we do not close the pipe it will loop forever. But for this use case we require all of the users to call finish()?

Why is it called finish instead of close?

Why can't the Subprocess own the input? Is there any situation where the input outlives the Subprocess?

We could make it idempotent where the 1st close would set the FileDescriptor to nil (to prevent double-closing). Then the Subprocess would call close again, just in case the user forgot. This will make finish/close optional for the users. They can, but they do not have to.

I do not like APIs based on the "user has to remember to do X". At least back in the RxSwift times we had DisposableBag and forgetting about it was a compiler warning. Here we get nothing.

StandardInputWriter - Sendable args?

public struct StandardInputWriter: Sendable {

  private let actor: StandardInputWriterActor

  @discardableResult
  public func write<S>(_ sequence: S) async throws -> Int where S : Sequence, S.Element == UInt8

  @discardableResult
  public func write<S: AsyncSequence>(_ asyncSequence: S) async throws -> Int where S.Element == UInt8
}

Those Sequences will go to the actor. Shouldn't they be Sendable? I get concurrency warning for try await Array(asyncSequence).

I think that any green-field project should enable StrictConcurrency by default. People may say something about false-positives, or that this feature is not yet ready. For me it caches bugs. And it creates the right mental model of how things work, so that we do not have to "unlearn" later.

IO - StandardInputWriter - write names

Bike-shedding: on FileDescriptor methods that take Sequence are called writeAll. On StandardInputWriter (which is a wrapper around a FileDescriptor) we have write.

Do we want more consistent naming?

IO - File ownership for close

@beaumont wrote:

Because .readFrom(:) and .writeTo(:) both take FileDescriptor I tab-completed my way into a footgun, by passing .standardInput, .standardOutput, and .standardOutput to input, output, and error respectively. This of course was silly for me to do, because they got closed after the process exited.

I was thinking about the same thing! But it may be a little bit more complicated:

  • single shouldCloseFiles switch - as soon as you set it to false you have to close ALL of the files by yourself. Obviously, it is really easy to forget to do so.
  • separate shouldClose per file - you have to remember which files you have to close, and closing them twice may be an error. So does forgetting it. I went this way in my implementation.

The real semantic that we are trying to express is: move = close, borrow = do nothing. If we ignore all of the enums and the overload explosion we would have:

func run(…, stdout: borrowing File, …) async throws -> CollectedResult
func run(…, closingStdout: consuming File, …) async throws -> CollectedResult

The consuming version would call the borrowing one and then close the file. Obviously deinit would not call close. It is the other way around: consuming func close() will end lifetime and deinit does nothing .

If we go back to reality (column wise):

InputMethod CollectedOutMethod RedirectedOutputMethod
noInput discard discard
readingFrom borrowing writeTo borrowing writeTo borrowing
readingFrom consuming writeTo consuming writeTo consuming
collect redirect
collect(limit:)

A lot of the overloads! And we can't use enum because we can't store borrowing. We can't use protocols, because move-only do not support them. But we can:

// For a second let's assume that FileDescriptor is move-only.

struct CollectedOutputMethod: ~Copyable {
  private enum Storage: ~Copyable, Sendable {
    case consuming(FileDescriptor)
    case borrowing(FileDescriptor)
  }

  static func writeAndClose(_ f: consuming FileDescriptor) -> CollectedOutputMethod {
    return CollectedOutputMethod(raw: .consuming(f))
  }

  static func write(_ f: borrowing FileDescriptor) -> CollectedOutputMethod {
    // We can't copy a FileDescriptor, but we can copy its properties.
    // It works as long as we do not try to close it.
    let copy = FileDescriptor(<copy properties>)
    return CollectedOutputMethod(raw: .borrowing(copy))
  }
}

Some people may not like it because we are (de-facto) copying the move-only object. I think it is all about the semantic of move-only, not about being move-only. And the semantic is: call close to consume. In the borrowing branch we will never call close. It is kind of like in move-only smart pointers we can copy the pointer, but the contract says that we should also increment refCount. Copy is allowed it is all about the semantic.

Obviously this is not possible because FileDescriptor is not move-only as we have to be compatible with the old Foundation. This was a theoretical exercise, but maybe somebody has some solution.

IO - Closing error

I already mentioned it a few weeks ago, but to have everything in 1 place:

Currently even if the process terminated successfully (regardless of the status), but the closing threw an exception then:

  • the parent process (aka. our code) will only get the file closing exception - even if the state of the world (for example database) has changed.
  • other files will not be closed.

There is a possible double closing issue.

4 Likes