I need to call some external program thousands of times and handle its output using AsyncBytes.lines. Therefore, I implemented a parallel foreach as follows:
extension Sequence {
public func parallelForEach(concurrentCount: Int = ProcessInfo.processInfo.activeProcessorCount, body: @escaping (Element) async -> Void) async {
assert(concurrentCount >= 1)
let counter = Group()
let limiter = Semaphore(initialPermits: concurrentCount)
for element in self {
await limiter.wait()
Task.detached {
await counter.enter()
await body(element)
await counter.leave()
await limiter.signal()
}
}
await counter.wait()
}
}
The foreach uses a semaphore to avoid “process explosion”, and a dispatch group to wait for all async tasks to finish. Both are implemented using actors to make them good citizens for Swift concurrency:
public actor Semaphore {
public private(set) var permits: Int
private var queue: [(TaskPriority, CheckedContinuation<Void, Never>)] = []
public init(initialPermits: Int = 0) {
permits = initialPermits
}
public func signal() {
permits += 1
guard permits > 0, !queue.isEmpty else { return }
assert(permits == 1)
permits -= 1
let (priority, cont) = queue.first!
queue.removeFirst()
Task.detached(priority: priority) { cont.resume() }
}
public func wait() async {
guard permits <= 0 else {
permits -= 1
return
}
await withCheckedContinuation { cont in
queue.append((Task.currentPriority, cont))
}
}
}
public actor Group {
private var counter: Int = 0
private var queue: [(TaskPriority, CheckedContinuation<Void, Never>)] = []
public func enter() {
counter += 1
clearQueue()
}
public func leave() {
counter -= 1
clearQueue()
}
private func clearQueue() {
guard counter == 0 else { return }
let queue = self.queue
self.queue = []
for (priority, cont) in queue {
Task.detached(priority: priority) { cont.resume() }
}
}
public func wait() async {
await withCheckedContinuation { cont in
queue.append((Task.currentPriority, cont))
}
}
}
I need to discard outputs of which the program exits with a non-zero status code. To solve this, I added a helper that can inject error into a finite async sequence:
private actor CombineResultState {
private var result: Result<Void, Error>?
private var cont: CheckedContinuation<Void, Error>?
func write(failure: Error?) {
let result: Result<Void, Error>
if let failure = failure {
result = .failure(failure)
} else {
result = .success(())
}
if let cont = cont {
cont.resume(with: result)
} else {
self.result = result
}
}
func checkError() throws {
if case .failure(let error) = result {
throw error
}
}
func waitForCompletion() async throws {
guard result == nil else {
try checkError()
return
}
try await withCheckedThrowingContinuation {
self.cont = $0
}
}
}
public struct CombineResultSequence<Base: AsyncSequence>: AsyncSequence {
private let base: Base
private let failureOrDone: () async -> Error?
init(base: Base, failureOrDone: @escaping () async -> Error?) {
self.base = base
self.failureOrDone = failureOrDone
}
public typealias Element = Base.Element
public func makeAsyncIterator() -> CombineResultIterator {
let state = CombineResultState()
Task.detached {
await state.write(failure: await failureOrDone())
}
return .init(iter: base.makeAsyncIterator(), state: state)
}
public final class CombineResultIterator: AsyncIteratorProtocol {
public typealias Element = Base.Element
private var iter: Base.AsyncIterator
private let state: CombineResultState
fileprivate init(iter: Base.AsyncIterator, state: CombineResultState) {
self.iter = iter
self.state = state
}
public func next() async throws -> Element? {
try await state.checkError()
if let next = try await iter.next() {
return next
}
try await state.waitForCompletion()
return nil
}
}
}
extension AsyncSequence {
func combine(result failureOrDone: @escaping () async -> Error?) -> CombineResultSequence<Self> {
CombineResultSequence(base: self, failureOrDone: failureOrDone)
}
}
I can call it like asyncBytes.lines.combine { try await process.checkTerminationStatus() }
. Even if I have consumed the async bytes, my iterator would still wait for failureOrDone
and act accordingly.
Above is the set up. Now comes the fun part: if, in my parallel for each, I call limiter.signal()
before counter.leave()
, counter.leave()
never gets executed! Below is a minimal working example that reproduces this issue.
In this example, I did the following for 100 times. I pick an integer between 200 and 1000 and generates an async sequence of that many integers. There is a 1ms delay between two integers. I also roll the dice and decide if that experiment is a “bomb”. I yield the bomb unboxing result after 500ms, so sometimes the integer sequence has finished, and other times it is still ongoing. The sequence and the async bomb unboxing is combined using .combine
mentioned above.
I log the number of calls made to limiter.signal()
, which is wrapped in a defer statement, and I log whenever I am about to call counter.leave()
. It appears around half of the time the program exits limiter.signal()
but never continues after await limiter.signal()
in the caller.
Here is the full code:
import Foundation
let logLock = NSLock()
var signalCount = 0
var counterCount = 0
func logSignal() {
logLock.lock()
signalCount += 1
print("Signal exited \(signalCount)")
logLock.unlock()
}
func logCounter() {
logLock.lock()
counterCount += 1
print("Before counter \(counterCount)")
logLock.unlock()
}
public actor Semaphore {
public private(set) var permits: Int
private var queue: [(TaskPriority, CheckedContinuation<Void, Never>)] = []
public init(initialPermits: Int = 0) {
permits = initialPermits
}
public func signal() {
defer { logSignal() }
permits += 1
guard permits > 0, !queue.isEmpty else { return }
assert(permits == 1)
permits -= 1
let (priority, cont) = queue.first!
queue.removeFirst()
Task.detached(priority: priority) { cont.resume() }
}
public func wait() async {
guard permits <= 0 else {
permits -= 1
return
}
await withCheckedContinuation { cont in
queue.append((Task.currentPriority, cont))
}
}
}
public actor Group {
private var counter: Int = 0
private var queue: [(TaskPriority, CheckedContinuation<Void, Never>)] = []
public func enter() {
counter += 1
clearQueue()
}
public func leave() {
counter -= 1
clearQueue()
}
private func clearQueue() {
guard counter == 0 else { return }
let queue = self.queue
self.queue = []
for (priority, cont) in queue {
Task.detached(priority: priority) { cont.resume() }
}
}
public func wait() async {
await withCheckedContinuation { cont in
queue.append((Task.currentPriority, cont))
}
}
}
extension Sequence {
public func parallelForEach(concurrentCount: Int = ProcessInfo.processInfo.activeProcessorCount, body: @escaping (Element) async -> Void) async {
assert(concurrentCount >= 1)
let counter = Group()
let limiter = Semaphore(initialPermits: concurrentCount)
for element in self {
await limiter.wait()
Task.detached {
await counter.enter()
await body(element)
await limiter.signal()
logCounter()
await counter.leave()
}
}
await counter.wait()
}
}
private actor CombineResultState {
private var result: Result<Void, Error>?
private var cont: CheckedContinuation<Void, Error>?
func write(failure: Error?) {
let result: Result<Void, Error>
if let failure = failure {
result = .failure(failure)
} else {
result = .success(())
}
if let cont = cont {
cont.resume(with: result)
} else {
self.result = result
}
}
func checkError() throws {
if case .failure(let error) = result {
throw error
}
}
func waitForCompletion() async throws {
guard result == nil else {
try checkError()
return
}
try await withCheckedThrowingContinuation {
self.cont = $0
}
}
}
public struct CombineResultSequence<Base: AsyncSequence>: AsyncSequence {
private let base: Base
private let failureOrDone: () async -> Error?
init(base: Base, failureOrDone: @escaping () async -> Error?) {
self.base = base
self.failureOrDone = failureOrDone
}
public typealias Element = Base.Element
public func makeAsyncIterator() -> CombineResultIterator {
let state = CombineResultState()
Task.detached {
await state.write(failure: await failureOrDone())
}
return .init(iter: base.makeAsyncIterator(), state: state)
}
public final class CombineResultIterator: AsyncIteratorProtocol {
public typealias Element = Base.Element
private var iter: Base.AsyncIterator
private let state: CombineResultState
fileprivate init(iter: Base.AsyncIterator, state: CombineResultState) {
self.iter = iter
self.state = state
}
public func next() async throws -> Element? {
try await state.checkError()
if let next = try await iter.next() {
return next
}
try await state.waitForCompletion()
return nil
}
}
}
extension AsyncSequence {
func combine(result failureOrDone: @escaping () async -> Error?) -> CombineResultSequence<Self> {
CombineResultSequence(base: self, failureOrDone: failureOrDone)
}
}
extension Int {
func boringSequence() -> AsyncStream<Int> {
var i: Int = 0
let max = self
return AsyncStream {
try! await Task.sleep(nanoseconds: 1000_000)
i += 1
if i > max {
return nil
}
return i
}
}
}
struct Bomb: Error {}
let group = DispatchGroup()
group.enter()
Task.detached {
await (0..<100).parallelForEach { i in
let count = (200..<1000).randomElement()!
let sum = try? await count
.boringSequence()
.combine {
let isBomb = [true, false].randomElement()!
try! await Task.sleep(nanoseconds: 500_000_000)
if isBomb {
return Bomb()
}
return nil
}
.reduce(into: 0) { $0 += $1 }
if let sum = sum {
print(i, sum)
} else {
print(i, "Bomb")
}
}
group.leave()
}
group.wait()
As mentioned before, if you switch the order of signaling the limiter and leaving the counter, it works perfectly. In my actual code that calls an external program, in the order that gets my code stuck, I also see issue like EXC_BAD_ACCESS, SIGCHILD, and the memory sanitizer sometimes complains about zombie objects, in addition to the “deadlock” issue. Also, if I do not try to inject an error into the async sequence, the issue is gone.
I wonder if there is something wrong with my implementation, some deeply hidden data race, or is there some bug in the Swift compiler? I am using:
- Apple Swift version 5.5.2 (swiftlang-1300.0.47.5 clang-1300.0.29.30)
- Xcode Version 13.2.1 (13C100)
- macOS Monterey 12.1