Execution stops at exiting from an async function

I need to call some external program thousands of times and handle its output using AsyncBytes.lines. Therefore, I implemented a parallel foreach as follows:

extension Sequence {
    public func parallelForEach(concurrentCount: Int = ProcessInfo.processInfo.activeProcessorCount, body: @escaping (Element) async -> Void) async {
        assert(concurrentCount >= 1)
        let counter = Group()
        let limiter = Semaphore(initialPermits: concurrentCount)

        for element in self {
            await limiter.wait()
            Task.detached {
                await counter.enter()
                await body(element)
                await counter.leave()
                await limiter.signal()
            }
        }

        await counter.wait()
    }
}

The foreach uses a semaphore to avoid “process explosion”, and a dispatch group to wait for all async tasks to finish. Both are implemented using actors to make them good citizens for Swift concurrency:

public actor Semaphore {
    public private(set) var permits: Int
    private var queue: [(TaskPriority, CheckedContinuation<Void, Never>)] = []

    public init(initialPermits: Int = 0) {
        permits = initialPermits
    }

    public func signal() {
        permits += 1
        guard permits > 0, !queue.isEmpty else { return }
        assert(permits == 1)

        permits -= 1
        let (priority, cont) = queue.first!
        queue.removeFirst()
        Task.detached(priority: priority) { cont.resume() }
    }

    public func wait() async {
        guard permits <= 0 else {
            permits -= 1
            return
        }

        await withCheckedContinuation { cont in
            queue.append((Task.currentPriority, cont))
        }
    }
}

public actor Group {
    private var counter: Int = 0
    private var queue: [(TaskPriority, CheckedContinuation<Void, Never>)] = []

    public func enter() {
        counter += 1
        clearQueue()
    }

    public func leave() {
        counter -= 1
        clearQueue()
    }

    private func clearQueue() {
        guard counter == 0 else { return }
        let queue = self.queue
        self.queue = []
        for (priority, cont) in queue {
            Task.detached(priority: priority) { cont.resume() }
        }
    }

    public func wait() async {
        await withCheckedContinuation { cont in
            queue.append((Task.currentPriority, cont))
        }
    }
}

I need to discard outputs of which the program exits with a non-zero status code. To solve this, I added a helper that can inject error into a finite async sequence:

private actor CombineResultState {
    private var result: Result<Void, Error>?
    private var cont: CheckedContinuation<Void, Error>?

    func write(failure: Error?) {
        let result: Result<Void, Error>
        if let failure = failure {
            result = .failure(failure)
        } else {
            result = .success(())
        }
        if let cont = cont {
            cont.resume(with: result)
        } else {
            self.result = result
        }
    }

    func checkError() throws {
        if case .failure(let error) = result {
            throw error
        }
    }

    func waitForCompletion() async throws {
        guard result == nil else {
            try checkError()
            return
        }

        try await withCheckedThrowingContinuation {
            self.cont = $0
        }
    }
}

public struct CombineResultSequence<Base: AsyncSequence>: AsyncSequence {
    private let base: Base
    private let failureOrDone: () async -> Error?

    init(base: Base, failureOrDone: @escaping () async -> Error?) {
        self.base = base
        self.failureOrDone = failureOrDone
    }

    public typealias Element = Base.Element

    public func makeAsyncIterator() -> CombineResultIterator {
        let state = CombineResultState()
        Task.detached {
            await state.write(failure: await failureOrDone())
        }
        return .init(iter: base.makeAsyncIterator(), state: state)
    }

    public final class CombineResultIterator: AsyncIteratorProtocol {
        public typealias Element = Base.Element

        private var iter: Base.AsyncIterator
        private let state: CombineResultState

        fileprivate init(iter: Base.AsyncIterator, state: CombineResultState) {
            self.iter = iter
            self.state = state
        }

        public func next() async throws -> Element? {
            try await state.checkError()
            if let next = try await iter.next() {
                return next
            }
            try await state.waitForCompletion()
            return nil
        }
    }
}

extension AsyncSequence {
    func combine(result failureOrDone: @escaping () async -> Error?) -> CombineResultSequence<Self> {
        CombineResultSequence(base: self, failureOrDone: failureOrDone)
    }
}

I can call it like asyncBytes.lines.combine { try await process.checkTerminationStatus() }. Even if I have consumed the async bytes, my iterator would still wait for failureOrDone and act accordingly.


Above is the set up. Now comes the fun part: if, in my parallel for each, I call limiter.signal() before counter.leave(), counter.leave() never gets executed! Below is a minimal working example that reproduces this issue.

In this example, I did the following for 100 times. I pick an integer between 200 and 1000 and generates an async sequence of that many integers. There is a 1ms delay between two integers. I also roll the dice and decide if that experiment is a “bomb”. I yield the bomb unboxing result after 500ms, so sometimes the integer sequence has finished, and other times it is still ongoing. The sequence and the async bomb unboxing is combined using .combine mentioned above.

I log the number of calls made to limiter.signal(), which is wrapped in a defer statement, and I log whenever I am about to call counter.leave() . It appears around half of the time the program exits limiter.signal() but never continues after await limiter.signal() in the caller.

Here is the full code:

import Foundation

let logLock = NSLock()

var signalCount = 0
var counterCount = 0

func logSignal() {
    logLock.lock()
    signalCount += 1
    print("Signal exited \(signalCount)")
    logLock.unlock()
}

func logCounter() {
    logLock.lock()
    counterCount += 1
    print("Before counter \(counterCount)")
    logLock.unlock()
}

public actor Semaphore {
    public private(set) var permits: Int
    private var queue: [(TaskPriority, CheckedContinuation<Void, Never>)] = []

    public init(initialPermits: Int = 0) {
        permits = initialPermits
    }

    public func signal() {
        defer { logSignal() }
        permits += 1
        guard permits > 0, !queue.isEmpty else { return }
        assert(permits == 1)

        permits -= 1
        let (priority, cont) = queue.first!
        queue.removeFirst()
        Task.detached(priority: priority) { cont.resume() }
    }

    public func wait() async {
        guard permits <= 0 else {
            permits -= 1
            return
        }

        await withCheckedContinuation { cont in
            queue.append((Task.currentPriority, cont))
        }
    }
}

public actor Group {
    private var counter: Int = 0
    private var queue: [(TaskPriority, CheckedContinuation<Void, Never>)] = []

    public func enter() {
        counter += 1
        clearQueue()
    }

    public func leave() {
        counter -= 1
        clearQueue()
    }

    private func clearQueue() {
        guard counter == 0 else { return }
        let queue = self.queue
        self.queue = []
        for (priority, cont) in queue {
            Task.detached(priority: priority) { cont.resume() }
        }
    }

    public func wait() async {
        await withCheckedContinuation { cont in
            queue.append((Task.currentPriority, cont))
        }
    }
}

extension Sequence {
    public func parallelForEach(concurrentCount: Int = ProcessInfo.processInfo.activeProcessorCount, body: @escaping (Element) async -> Void) async {
        assert(concurrentCount >= 1)
        let counter = Group()
        let limiter = Semaphore(initialPermits: concurrentCount)

        for element in self {
            await limiter.wait()
            Task.detached {
                await counter.enter()
                await body(element)
                await limiter.signal()
                logCounter()
                await counter.leave()
            }
        }

        await counter.wait()
    }
}

private actor CombineResultState {
    private var result: Result<Void, Error>?
    private var cont: CheckedContinuation<Void, Error>?

    func write(failure: Error?) {
        let result: Result<Void, Error>
        if let failure = failure {
            result = .failure(failure)
        } else {
            result = .success(())
        }
        if let cont = cont {
            cont.resume(with: result)
        } else {
            self.result = result
        }
    }

    func checkError() throws {
        if case .failure(let error) = result {
            throw error
        }
    }

    func waitForCompletion() async throws {
        guard result == nil else {
            try checkError()
            return
        }

        try await withCheckedThrowingContinuation {
            self.cont = $0
        }
    }
}

public struct CombineResultSequence<Base: AsyncSequence>: AsyncSequence {
    private let base: Base
    private let failureOrDone: () async -> Error?

    init(base: Base, failureOrDone: @escaping () async -> Error?) {
        self.base = base
        self.failureOrDone = failureOrDone
    }

    public typealias Element = Base.Element

    public func makeAsyncIterator() -> CombineResultIterator {
        let state = CombineResultState()
        Task.detached {
            await state.write(failure: await failureOrDone())
        }
        return .init(iter: base.makeAsyncIterator(), state: state)
    }


    public final class CombineResultIterator: AsyncIteratorProtocol {
        public typealias Element = Base.Element

        private var iter: Base.AsyncIterator
        private let state: CombineResultState

        fileprivate init(iter: Base.AsyncIterator, state: CombineResultState) {
            self.iter = iter
            self.state = state
        }

        public func next() async throws -> Element? {
            try await state.checkError()
            if let next = try await iter.next() {
                return next
            }
            try await state.waitForCompletion()
            return nil
        }
    }
}

extension AsyncSequence {
    func combine(result failureOrDone: @escaping () async -> Error?) -> CombineResultSequence<Self> {
        CombineResultSequence(base: self, failureOrDone: failureOrDone)
    }
}

extension Int {
    func boringSequence() -> AsyncStream<Int> {
        var i: Int = 0
        let max = self
        return AsyncStream {
            try! await Task.sleep(nanoseconds: 1000_000)
            i += 1
            if i > max {
                return nil
            }
            return i
        }

    }
}

struct Bomb: Error {}

let group = DispatchGroup()
group.enter()
Task.detached {
    await (0..<100).parallelForEach { i in
        let count = (200..<1000).randomElement()!
        let sum = try? await count
            .boringSequence()
            .combine { 
                let isBomb = [true, false].randomElement()!
                try! await Task.sleep(nanoseconds: 500_000_000)
                if isBomb {
                    return Bomb()
                }
                return nil
             }
            .reduce(into: 0) { $0 += $1 }
        if let sum = sum {
            print(i, sum)
        } else {
            print(i, "Bomb")
        }
    }
    group.leave()
}
group.wait()

As mentioned before, if you switch the order of signaling the limiter and leaving the counter, it works perfectly. In my actual code that calls an external program, in the order that gets my code stuck, I also see issue like EXC_BAD_ACCESS, SIGCHILD, and the memory sanitizer sometimes complains about zombie objects, in addition to the “deadlock” issue. Also, if I do not try to inject an error into the async sequence, the issue is gone.

I wonder if there is something wrong with my implementation, some deeply hidden data race, or is there some bug in the Swift compiler? I am using:

  • Apple Swift version 5.5.2 (swiftlang-1300.0.47.5 clang-1300.0.29.30)
  • Xcode Version 13.2.1 (13C100)
  • macOS Monterey 12.1

It's much simpler to structure something like parallelForEach around Tasks or TaskGroup rather than building your own abstractions.

extension Sequence where Element: Sendable {
    func unboundedParallelForEach(_ closure: @Sendable @escaping (Element) -> Void) async {
        let tasks = map { element in
            Task { closure(element) }
        }
        
        for task in tasks {
            _ = await task.value
        }
    }
}

extension Collection where Element: Sendable {
    func parallelForEach(maxWidth: Int = 10, closure: @Sendable @escaping (Element) -> Void) async {
        await withTaskGroup(of: Void.self) { group in
            let initialBatch = (maxWidth <= count) ? maxWidth : count
            var remainder = count - initialBatch

            for element in prefix(initialBatch) {
                group.addTask {
                    closure(element)
                }
            }
            
            let suffix = suffix(remainder)
            var iterator = suffix.makeIterator()
            for await _ in group {
                guard remainder > 0, let element = iterator.next() else { break }
                
                group.addTask {
                    closure(element)
                }
                
                remainder -= 1
            }
        }
    }
}

You'll also want to make sure you're using the Xcode 13.3 betas, as it includes many new warnings for concurrency safety and fixes a lot of bugs. You could also enable the concurrency warnings in 13.2 but I find them more useful in 13.3.

1 Like

Thanks! I never fully understood the purpose of Task and TaskGroup. Your example is great.

I am downloading Xcode 13.3 (slow Internet speed). Will post the new result. Hopefully this is just a bug that gets fixed already.

The same issue persists on the latest Xcode 13.3 beta 2. I was instructed to add two Sendable annotations on parallelForEach but that is about it:

extension Sequence where Element: Sendable {
    public func parallelForEach(concurrentCount: Int = ProcessInfo.processInfo.activeProcessorCount, body: @escaping @Sendable (Element) async -> Void) async {
        assert(concurrentCount >= 1)
        let counter = Group()
        let limiter = Semaphore(initialPermits: concurrentCount)

        for element in self {
            await limiter.wait()
            Task.detached {
                await counter.enter()
                await body(element)
                await limiter.signal()
                logCounter()
                await counter.leave()
            }
        }

        await counter.wait()
    }
}

Overall the whole code still gets stuck. Notice that in my testing code:

Task.detached {
    await (0..<100).parallelForEach { i in
        let count = (200..<1000).randomElement()!
        let sum = try? await count
            .boringSequence()
            .combine { 
                let isBomb = [true, false].randomElement()!
                try! await Task.sleep(nanoseconds: 500_000_000)
                if isBomb {
                    return Bomb()
                }
                return nil
             }
            .reduce(into: 0) { $0 += $1 }
        if let sum = sum {
            print(i, sum)
        } else {
            print(i, "Bomb")
        }
    }
    group.leave()
}

If I remove the whole .combine thing, everything works again. However, I cannot imagine how CombineResultSequence could have anything to do with parallelForEach’s execution…

I don’t see a bug at a glance, but this is fairly subtle. It’s possible that the outer use of DispatchGroup is blocking progress, but I’d think that there would be at least one other thread available to service things. Can you tell in the debugger what your threads are actually doing when you’re deadlocked?

One thing I did notice that isn’t a bug but you might as well fix: resuming a continuation is already asynchronous, so you don’t need to make a task to do it.

1 Like

Thanks! I have fixed this resuming continuation issue. I have also moved to a Swift package executable targeting using the new @main attribute, so no more DispatchGroup.

The fun thing is that I encountered random EXC_BAD_ACCESS issues. If I turn on Address Sanitizer, no such error happens, and the program just hung. In Xcode, thread 1 is in mach_msg_trap, the other threads are in __workq_kernreturn. Sometimes, one or two threads show error: memory read failed for 0x0.

I am using Xcode 13.3 Beta 2. As said before, this issue only exists if I call my limiter.signal before counter.leave, AND in the work closure I “inject” an error into an async sequence.

Here is a link to the latest code, ready to use in the form of Swift package.