AsyncThrowingStream unexpectedly hanging

When I run the following code, the program is stuck on the following

runner_1_start
{"type":"ready"}
runner_1_stop
runner_2_start

and does not complete with the following output

runner_1_start
{"type":"ready"}
runner_1_stop
runner_2_start
{"type":"ready"}
runner_2_stop

until the first process terminates

import Foundation

class Runner {
    let events: AsyncThrowingStream<String, any Error>

    init(executablePath: String, arguments: [String]) throws {
        let process = Process()
        let stdin = Pipe()
        let stdout = Pipe()

        process.executableURL = URL(filePath: executablePath)
        process.arguments = arguments
        process.standardInput = stdin
        process.standardOutput = stdout

        events = AsyncThrowingStream<String, any Error> { continuation in
            Task {
                for try await line in stdout.fileHandleForReading.bytes.lines {
                    continuation.yield(line)
                }
            }
        }

        try process.run()
    }
}

let code =
    """
    import sys
    import time

    time.sleep(1)
    sys.stdout.write('{"type":"ready"}\\n')
    sys.stdout.flush()
    time.sleep(10)
    """

let executablePath = "/usr/bin/python3"
let arguments = ["-c", code]

let runner_1 = try Runner(
    executablePath: executablePath,
    arguments: arguments
)

print("runner_1_start")

for try await line in runner_1.events {
    print(line)
    break
}

print("runner_1_stop")

let runner_2 = try Runner(
    executablePath: executablePath,
    arguments: arguments
)

print("runner_2_start")

// Problem occurs here
for try await line in runner_2.events {
    print(line)
    break
}

print("runner_2_stop")

I have already tried using a detached task and the problem still persists. The issue does not occur if stdout.fileHandleForReading.bytes.lines is directly accessed without going through an AsyncThrowingStream. I am not sure exactly what is going wrong here.

This is being run on:

swift-driver version: 1.120.5 Apple Swift version 6.1.2 (swiftlang-6.1.2.1.2 clang-1700.0.13.5)
Target: arm64-apple-macosx15.0

Thanks in advance for any help.

That API requires a CFRunLoop to be present within the executing thread of the caller for any sort of meaningful events to occur when interacting with Process. That means that when you are running it as a bare sequence (which is honestly the more appropriate/efficient approach than wrapping it in an AsyncStream - it just is extra overhead to convert an AsyncSequence to an AsyncSequence) that is actually running it on the main actor; which has a RunLoop. However when you are running it on a task or a detached task it will execute on a dispatch queue as part of the default executor; which has no run loops installed, and if you access one in those isolations it won't be spun at any point later so it will never execute the drain of the process.

FileHandle.bytes is of type AsyncBytes but it's crucially not asynchronously reading anything. It's using a single, global actor that performs blocking reads from the file descriptors on Swift Concurrency's thread pool where you must never block.

So using FileDescriptor.bytes on a Pipe (which is what you're doing here) is almost guaranteed to get you into a deadlock.

And just to be clear: That's a serious implementation problem with FileHandle.bytes and not your fault for using it.

If you need subprocesses & Swift Concurrency, you can use either of the following:

    import AsyncProcess
    import AsyncAlgorithms // for `merge`ing stdout & stderr, optional if you don't need that

    let exe = ProcessExecutor(executable: "/bin/bash", ["-c", "echo hello world"])
    async let result = exe.run()
    for line in try await merge(exe.standardOutput.splitIntoLines(), exe.standardError.splitIntoLines()).strings {
        print("output line", line)
    )
    print("execution result", try await result)
  • The nascent https://github.com/swiftlang/swift-subprocess package which implements many of the same features. There are still a number of bugs that need ironing out but hopefully at some point soon that'll become an option too
1 Like

@Philippe_Hausler @johannesweiss – out of curiosity, how did you know about (or determine) these caveats? are these behaviors documented anywhere that you know of (requiring a run loop for Process, using a single global actor performing blocking fd reads)?

1 Like

You can show this with the following program. The program creates two pipes (pipe1 & pipe2). It never writes into pipe1 but it does write to pipe2. Then it triggers a child task which reads from pipe1 and the main task read from pipe2.

The expected behaviour is: The read from pipe2 succeeds immediately and our program returns.

What happens in reality is that FileHandle.bytes (due to the aforementioned implementation problem) starts a single blocking read from pipe1 (which will never return of course, because there's nothing to read) and the would-be-successful read from pipe2 never even gets started -> deadlock.

import Foundation


func go() async throws {
    let pipe1 = Pipe()
    let pipe2 = Pipe()

    var it2 = pipe2.fileHandleForReading.bytes.makeAsyncIterator()

    async let _ = {
        // child task that reads from pipe1 (which we never write into)
        // so it'll just sit there
        var it1 = pipe1.fileHandleForReading.bytes.makeAsyncIterator()
        let chunk1 = try await it1.next() // will never complete
        print("weird, unexpectedly read from pipe1", chunk1.debugDescription)
        return chunk1
    }()

    try await Task.sleep(nanoseconds: 500_000_000) // some time for it1 to start

    pipe2.fileHandleForWriting.write(Data("hello\n".utf8))

    print("written into pipe2, waiting for bytes to be readable (this should be instant...")
    fflush(stdout)

    let chunk2 = try await it2.next() // should immediately complete
    print("read from pipe2 (expected to work instantly)", chunk2.debugDescription)

    // returning from this function Should cancel chunk1's child task
}

try await go()

actual output:

$ swiftc -o AsyncBytesRead test.swift && ./AsyncBytesRead
written into pipe2, waiting for bytes to be readable (this should be instant...
[... hang forever ...]

blocking read:

We can check what's going on if we sample the program by running sample AsyncBytesRead which contains

    8628 Thread_1558316
      8628 completeTask(swift::AsyncContext*, swift::SwiftError*)  (in libswift_Concurrency.dylib) + 1  [0x271f767bd]
        8628 partial apply for implicit closure #1 in go()  (in AsyncBytesRead) + 1  [0x1023a9d7d]
          8628 implicit closure #1 in go()  (in AsyncBytesRead) + 1  [0x1023a9bc5]
            8628 closure #1 in implicit closure #1 in go()  (in AsyncBytesRead) + 1  [0x1023aa0fd]
              8628 NSFileHandle.AsyncBytes.Iterator.next()  (in Foundation) + 1  [0x18b4db1ad]
                8628 _AsyncBytesBuffer.reloadBufferAndNext()  (in Foundation) + 1  [0x18b4d9805]
                  8628 partial apply for closure #1 in NSFileHandle.AsyncBytes.Iterator.init(file:)  (in Foundation) + 1  [0x18b4da665]
                    8628 closure #1 in NSFileHandle.AsyncBytes.Iterator.init(file:)  (in Foundation) + 76  [0x18b4da090]
                      8628 read  (in libsystem_kernel.dylib) + 8  [0x18969a7dc]   <<<--- BLOCKING READ

and swift inspect dump-concurrency AsyncBytesRead can show that we indeed have the expected two tasks (the main task and the async let child task) and Foundation.IOActor.

$ sudo swift inspect dump-concurrency AsyncBytesRead
TASKS
        
      Task 0x1 - flags=future|enqueued enqueuePriority=0x15 maxPriority=0x0 address=0x12c804ff0
        async backtrace: partial apply for closure #1 in NSFileHandle.AsyncBytes.Iterator.init(file:)
                         _AsyncBytesBuffer.reloadBufferAndNext()
                         NSFileHandle.AsyncBytes.Iterator.next()
                         go()
                         async_MainTQ0_
                         thunk for @escaping @convention(thin) @async () -> ()
                         partial apply for thunk for @escaping @convention(thin) @async () -> ()
                         completeTaskWithClosure(swift::AsyncContext*, swift::SwiftError*)
        resume function: closure #1 in NSFileHandle.AsyncBytes.Iterator.init(file:) in Foundation
        task allocator: 3048 bytes in 6 chunks
        * 1 child task
             
        `--Task 0x2 - flags=childTask|future|asyncLetTask|running enqueuePriority=0x15 maxPriority=0x0 address=0x12d009a90
             current task on thread 0x17c72c
             parent: 0x12c804ff0
             waiting on thread: port=0x2003 id=0x17c721
             resume function: closure #1 in NSFileHandle.AsyncBytes.Iterator.init(file:) in Foundation
             task allocator: 1272 bytes in 4 chunks

ACTORS
  0x600002f9c0e0 Foundation.IOActor state=idle flags=0 maxPriority=0x0
    no jobs queued

THREADS
  Thread 0x17c72c - current task: 0x2
5 Likes