I've been experimenting with the structured concurrency and I seem to have created a deadlock race condition. I can't see what I'm doing wrong but I'm not sure if it is a silly error, a fundamental misunderstanding or if it is a beta 1 bug in macOS 12.0.
The full project can be found here: GitHub - josephlord/AsyncSequnceExtensions at 5e00bd7c83f9f85dc2fb627f663ecaf2970b5a2b
When I run the testAsyncSequencePublisherBasic test on the Mac (M1 Mac Mini) it fails by appearing to deadlock about 20% of the time. I'm running in Xcode and using the "Run "xxx: Repeatedly" option by right clicking on the test. It is working in the iOS Simulator but that may just have been a consistent race winner. In the repo you will see the test timeout is only half a second bigger times make no difference.
The experiment I'm trying to do is to map from an AsyncSequence into a Combine publisher. Below the code snippet are the print outputs of a good and a bad run.
@available(iOS 15.0, macOS 12.0, *)
public struct AsyncSequencePublisher<AsyncSequenceType> : Publisher where AsyncSequenceType : AsyncSequence {
public typealias Output = AsyncSequenceType.Element
public typealias Failure = Error
let sequence: AsyncSequenceType
public init(_ sequence: AsyncSequenceType) {
self.sequence = sequence
}
fileprivate actor ASPSubscription<S> : Subscription
where S : Subscriber, S.Failure == Error, S.Input == AsyncSequenceType.Element {
var demand: Subscribers.Demand = .none
var taskHandle: Task.Handle<(), Never>?
var demandUpdatedContinuation: CheckedContinuation<Void, Never>?
private func mainLoop(seq: AsyncSequenceType, sub: S) {
Swift.print("MainLoop")
taskHandle = detach {
do {
Swift.print("Loop")
for try await element in seq {
Swift.print("element: \(element)")
await self.waitUntilReadyForMore()
guard !Task.isCancelled else { return }
await self.setDemand(demand: sub.receive(element))
}
sub.receive(completion: .finished)
} catch {
if error is Task.CancellationError { return }
sub.receive(completion: .failure(error))
}
}
Swift.print("taskHandle set")
}
private func waitUntilReadyForMore() async {
Swift.print("Wait on Demand")
if demand > 0 {
demand -= 1
Swift.print("Demand OK - continue")
return
}
let _: Void = await withCheckedContinuation { continuation in
Swift.print("Set continuation")
demandUpdatedContinuation = continuation
}
}
nonisolated init(sequence: AsyncSequenceType, subscriber: S) {
async {
await mainLoop(seq: sequence, sub: subscriber)
}
Swift.print("init returned")
}
nonisolated func request(_ demand: Subscribers.Demand) {
Swift.print("request: \(demand)")
detach {
Swift.print("Will await setDemand")
await self.setDemand(demand: demand)
Swift.print("Back from await setDemand")
}
}
nonisolated func cancel() {
detach {
Swift.print("Cancel")
await self.setCanceled()
await self.demandUpdated()// Unblock so the main loop can hit the task cancellation guard
}
}
private func demandUpdated() {
demandUpdatedContinuation?.resume()
demandUpdatedContinuation = nil
}
private func setCanceled() async {
taskHandle?.cancel()
}
private func setDemand(demand: Subscribers.Demand) async {
self.demand += demand
guard demand > 0 else { return }
demandUpdated()
}
deinit {
taskHandle?.cancel()
demandUpdatedContinuation?.resume()
}
}
public func receive<S>(subscriber: S) where S : Subscriber, Error == S.Failure, AsyncSequenceType.Element == S.Input {
let subscription = ASPSubscription(sequence: sequence, subscriber: subscriber)
subscriber.receive(subscription: subscription)
}
}
@available(iOS 15.0, macOS 12.0, *)
extension AsyncSequence {
public var publisher: AsyncSequencePublisher<Self> {
AsyncSequencePublisher(self)
}
}
A successful run looks like:
init returned
request: unlimited
MainLoop
taskHandle set
Will await setDemand
Loop
Back from await setDemand
element: 1
Wait on Demand
Demand OK - continue
received 1
finished
The prints from an unsuccessful run look like this:
init returned
request: unlimited
MainLoop
Will await setDemand
Loop
taskHandle set
element: 1
It appears that both detached tasks (in mainLoop
and request
) get blocked from entering the Actor's controlled state (waitUntilReadyForMore
and setDemand
respectively) but I can't see what could be holding the actor indefinitely.
If "Wait on Demand" was printed I would suspect that I had misunderstood the effect of withCheckedContinution
but I don't think it even gets that far.
At this point this is just experimental / learning code, I don't have an urgent need for an AsyncSequencePublisher so even if there is a built in or simpler equivalent I'd still like to understand what is going wrong here (I am interested in alternative approaches but not instead of an answer on this).
Any help gratefully received.