Crash in "safe" swift with actors/concurrency

So, I was trying to create a wrapper that would allow me to iterate a single async iterator from multiple concurrent tasks. I tried a naïve approach:

actor SharedAsyncIterator<Sequence: AsyncSequence>
where Sequence.Element: Sendable {

    var iterator: Sequence.AsyncIterator

    init(iterating sequence: Sequence) {
        self.iterator = sequence.makeAsyncIterator()
    }

    func next() async rethrows -> Sequence.Element? {
        try await iterator.next()
    }

}

This immediately fails with a compiler error I hadn't seen before:

swiftc -strict-concurrency=complete actor-iterator.swift
actor-iterator.swift:11:28: error: cannot call mutating async function 'next()' on actor-isolated property 'iterator'
        try await iterator.next()
                           ^

It seems that this error is because this may cause the actor-protected value in iterator to be mutated outside the actor, since actor isolation ends at await? Though I've read several threads about this and not certain I've yet read or understood the "actual" reason for this error...

In any case, one can dodge the error easily enough:

actor SharedAsyncIterator<Sequence: AsyncSequence>
where Sequence.Element: Sendable {

    class Inner {
        var iterator: Sequence.AsyncIterator

        init(iterator: Sequence.AsyncIterator) {
            self.iterator = iterator
        }

        func next() async rethrows -> Sequence.Element? {
            try await iterator.next()
        }
    }

    let inner: Inner

    init(iterating sequence: Sequence) {
        self.inner = Inner(iterator: sequence.makeAsyncIterator())
    }

    func next() async rethrows -> Sequence.Element? {
        try await inner.next()
    }

}

This compiles without error. I find that concerning: it's exactly equivalent to the previous code (except with an additional allocation); whatever behavior the previous error was trying to prevent is certainly present in this code.

Let's try to break it, here's a little async sequence that uses yield to pretend it's doing something actually asynchronous:

struct AsyncThousand: AsyncSequence, Sendable {

    typealias Element = Int

    struct AsyncIterator: AsyncIteratorProtocol {

        var value = 0

        mutating func next() async -> Int? {
            let v = value
            await Task.yield()
            if v < 1000 {
                value = v + 1
                return v
            } else {
                return nil
            }
        }

    }

    func makeAsyncIterator() -> AsyncIterator {
        AsyncIterator()
    }

}

And here's a little test harness that uses my "SharedIterator" above to iterate this sequence in parallel:

let sharedIterator = SharedAsyncIterator(iterating: AsyncThousand())

let allResults = await withTaskGroup(of: [Int].self) { group in
    for _ in 0..<10 {
        group.addTask {
            var results = [Int]()
            while let result = await sharedIterator.next() {
                results.append(result)
            }
            return results
        }
    }
    var allResults = [Int]()
    for await results in group {
        allResults.append(contentsOf: results)
    }
    return allResults
}
print(allResults.count)
print(Set(allResults).count)

Running this, we can see we have problems, though no crashes:

swiftc -strict-concurrency=complete actor-iterator.swift && ./actor-iterator
1505
1000

505 elements of the AsyncThousand stream have been duplicated, because the next method on the iterator was "reentered" during its yield.

OK, if we're not crashing, all we need is to prevent that reentry into the next function, and we'll be done! Here's an attempt to ensure that only one task can enter the SharedIterator at a time:

actor SharedAsyncIterator<Sequence: AsyncSequence>
where Sequence.Element: Sendable {

    class Inner {
        var iterator: Sequence.AsyncIterator
        var busy = false
        var queue = [CheckedContinuation<Void, Never>]()

        init(iterator: Sequence.AsyncIterator) {
            self.iterator = iterator
        }

        func next() async rethrows -> Sequence.Element? {
            if busy {
                await withCheckedContinuation {
                    queue.append($0)
                }
            } else {
                busy = true
            }

            defer {
                if queue.isEmpty {
                    busy = false
                } else {
                    queue.remove(at: 0).resume()
                }
            }

            return try await iterator.next()
        }
    }

    let inner: Inner

    init(iterating sequence: Sequence) {
        self.inner = Inner(iterator: sequence.makeAsyncIterator())
    }

    func next() async rethrows -> Sequence.Element? {
        try await inner.next()
    }

}

Basically, when we enter next, if we're already busy with a previous invocation, queue up behind the currently executing task. When we finish calling the underlying iterator's next, we can resume the next task in the queue, or go back to idling if nobody's waiting.

Uh-oh:

swiftc -strict-concurrency=complete actor-iterator.swift && ./actor-iterator   
lldb -- ./actor-iterator
Process 48036 launched: '/Users/***/Desktop/actor-iterator' (arm64)
Process 48036 stopped
* thread #2, queue = 'com.apple.root.default-qos.cooperative', stop reason = EXC_BAD_ACCESS (code=1, address=0x18)
    frame #0: 0x00000001ba2921d0 libswiftCore.dylib`Swift._StringObject.getSharedUTF8Start() -> Swift.UnsafePointer<Swift.UInt8> + 8
libswiftCore.dylib`Swift._StringObject.getSharedUTF8Start() -> Swift.UnsafePointer<Swift.UInt8>:
->  0x1ba2921d0 <+8>:  ldr    x0, [x0, #0x18]
    0x1ba2921d4 <+12>: ret    
    0x1ba2921d8 <+16>: adrp   x8, 265771
    0x1ba2921dc <+20>: add    x1, x8, #0xf28
Target 0: (actor-iterator) stopped.
(lldb) bt
* thread #2, queue = 'com.apple.root.default-qos.cooperative', stop reason = EXC_BAD_ACCESS (code=1, address=0x18)
  * frame #0: 0x00000001ba2921d0 libswiftCore.dylib`Swift._StringObject.getSharedUTF8Start() -> Swift.UnsafePointer<Swift.UInt8> + 8
    frame #1: 0x00000001ba292200 libswiftCore.dylib`Swift._StringObject.sharedUTF8.getter : Swift.UnsafeBufferPointer<Swift.UInt8> + 24
    frame #2: 0x00000001ba288dd0 libswiftCore.dylib`Swift._StringGuts.append(Swift._StringGutsSlice) -> () + 1032
    frame #3: 0x0000000238027e58 libswift_Concurrency.dylib`Swift.CheckedContinuation.resume(returning: __owned τ_0_0) -> () + 376
    frame #4: 0x0000000100007044 actor-iterator`Swift.CheckedContinuation.resume< where τ_0_0 == ()>() -> () + 16
    frame #5: 0x00000001000058ec actor-iterator`$defer<τ_0_0 where τ_0_0: Swift.AsyncSequence, τ_0_0.Element: Swift.Sendable>() -> () + 356
    frame #6: 0x0000000100005604 actor-iterator`(5) suspend resume partial function for main.SharedAsyncIterator.Inner.next() async throws -> Swift.Optional<τ_0_0.Element> + 76
    frame #7: 0x0000000100005c94 actor-iterator`(2) await resume partial function for main.SharedAsyncIterator.next() async throws -> Swift.Optional<τ_0_0.Element>
    frame #8: 0x0000000100006814 actor-iterator`(2) await resume partial function for closure #1 @Sendable () async -> Swift.Array<Swift.Int> in closure #1 (inout Swift.TaskGroup<Swift.Array<Swift.Int>>) async -> Swift.Array<Swift.Int> in main
    frame #9: 0x0000000100007540 actor-iterator`(1) await resume partial function for reabstraction thunk helper <τ_0_0 where τ_0_0: Swift.Sendable> from @escaping @callee_guaranteed @Sendable @async () -> (@out τ_0_0) to @escaping @callee_guaranteed @async () -> (@out τ_0_0, @error @owned Swift.Error)
    frame #10: 0x0000000100007680 actor-iterator`(1) await resume partial function for partial apply forwarder for reabstraction thunk helper <τ_0_0 where τ_0_0: Swift.Sendable> from @escaping @callee_guaranteed @Sendable @async () -> (@out τ_0_0) to @escaping @callee_guaranteed @async () -> (@out τ_0_0, @error @owned Swift.Error)

So, I've crashed horribly in completely "safe" swift, with strict-concurrency checks enabled. Something's fallen through the cracks somewhere; there should certainly be a compilation error somewhere in my code! Perhaps this is in some way what that original error was trying to prevent...

I can "fix" the problem with a lock and a class wrapper and a compiler warning telling me this won't continue to work:

actor SharedAsyncIterator<Sequence: AsyncSequence>
where Sequence.Element: Sendable {

    class Inner {
        var iterator: Sequence.AsyncIterator
        let lock: NSLock = NSLock()

        init(iterator: Sequence.AsyncIterator) {
            self.iterator = iterator
        }

        func next() async rethrows -> Sequence.Element? {
            lock.lock()
            defer { lock.unlock() }
            return try await iterator.next()
        }
    }

    let inner: Inner

    init(iterating sequence: Sequence) {
        self.inner = Inner(iterator: sequence.makeAsyncIterator())
    }

    func next() async rethrows -> Sequence.Element? {
        try await inner.next()
    }

}
swiftc -strict-concurrency=complete actor-iterator.swift && ./actor-iterator
actor-iterator.swift:15:18: warning: instance method 'lock' is unavailable from asynchronous contexts; Use async-safe scoped locking instead; this is an error in Swift 6
            lock.lock()
                 ^
Foundation.NSLock:11:15: note: 'lock()' declared here
    open func lock()
              ^
1000
1000

I'm left with a few questions for the tl;dr:

  • Am I right that "safe" swift, that compiles without error or warning with -strict-concurrency=complete, and does not call a buggy or unsafe API, should never experience a memory-unsafety crash?
  • Is the original error too narrow? Is it ever safe to call an async method on a non-Sendable self that's "protected state" of an actor?
  • How should I write an async iterator adapter that allows multiple consumers to consume a single async iterator?

Thanks @KeithBauerANZ for opening this. There are a couple of things here so let's discuss these one by one.

Actor isolation warning

struct Counter {
    private var counter = 0
    mutating func increment() async {
        self.counter += 1
    }
}

actor Bar {
  private var counter = Counter()

  func bar() async {
    await self.counter.increment()
  }
}

The above error is completely correct and also the reason for why you see your other behaviour. While the access to self.foo is isolated and therefore thread safe. The call to foo() is going to hop off the actor executor and onto the concurrency pool. This means we are sending foo across an isolation domain here.

The workaround with the class

struct Counter {
    private var counter = 0
    mutating func increment() async {
        self.counter += 1
    }
}

final class Wrapper {
    private var counter = Counter()

    func increment() async {
        await self.counter.increment()
    }
}

actor Bar {
  private var counter = Wrapper()

  func bar() async {
    await self.counter.increment()
  }
}

Wrapping your iterator into a class is hiding the mutating effect from the compiler in the actor however we are still sending the instance of Wrapper across an isolation domain here so it must be Sendable. This is where the compiler fails to diagnose a warning right now and I filed an issue for this: Sendable checking hole when actors call non-mutating async methods on non-Sendable types · Issue #65315 · apple/swift · GitHub cc @ktoso

How to consume an iterator from multiple tasks

This is a great question and as always it depends. The first thing you gotta ask yourself is how you want to consume from multiple tasks. Should all consumers get all the elements, should they be load balanced across consumers or should it be FIFO based?
In the end, what we have to do is to provide concrete async-algorithms for each of those similar to what the Fan-Out operators of Akka. The implementation of these algorithms isn't trivial right now. They have to do a weird dance where they consume the iterator inside an unstructured Task and rely the demand through a state machine behind a lock using continuations.

This is more complicated then necessary and requires careful usage of a lock and continuations to make sure back pressure is upheld and no continuation is dropped. In theory this could be easier to implement if we get more language features such as generators but until then we have to resort to the unstructured task approach. Please feel free to file an issue in async-algorithms with the concrete fan-out behaviour that you want.

3 Likes

This is definitely a bug in the compiler, because SE-0306 specifies that actor-isolated state cannot be passed inout to an async operation.

Doug

I agree that it is a bug in the compiler but it is not a bug in the inout passing since the reference to self of the Wrapper class is not passed inout afaik. It is a far wider reaching problem in that any non Sendable type with an async method must be Sendable since any call to that async method is going to potentially a hop. We definitely need a way out of this because it will probably cause massive amounts of correct warnings without a real way to fix them.

From my experience there are two spellings that we really need to give users more control over where their code is running.

First a way for async methods to state that they want to inherit the callers executor. This is similar to the current @_unsafeInheritExecutor but without the unsafeness of it. This would be something that we want to mark methods like withCheckedContinuation and potentially also all the next() methods of async iterators with. Secondly, a way to override the global executor for a scope which allows the caller to control where the execution happens.

For now we should introduce the warning for the issue and then deal with the fallout.

3 Likes

I've been thinking about this, and the more I think about it, the more I find the idea that any await in an actor jumps back to the "global concurrency pool" completely counterintuitive.

If I have an object which is "protected state" of an actor, and I call an async method on that object, "of course" I want to do that method "on the actor".

Obviously that doesn't hold up if the method requires a particular global actor, but I feel like that's an understandable exception to the general concept, that once on an actor, you don't leave except via a @Sendable closure...

I guess the problem is when you are in a @MainActor function you don't want "more" stuff than necessary to use the main actor, but I still feel like the idea that calling any async function can implicitly yoink you off the main actor to be quite unintuitive. And if this behavior absolutely had to be preserved, the idea that methods on protected state of an actor retain the actor is still more intuitive than the current "any await always leaves the declared actor"

Is there any scope here to change the rules for jumping off an actor to make it work more intuitively, rather than simply adding a warning/error to largely forbid the use of async from actors?

2 Likes

It also occurs to me that fixing this prevents the other footgun in my post — the fact that actor isolation doesn't currently prevent me from reentering AsyncIterator.next(). The "escape hatch" of putting the iterator into a class will cease to function, as that class would now have to be Sendable to be usable from the actor, and most AsyncIterator are not Sendable.

It makes me think types with mutating async methods should not be eligible for automatic Sendable conformance, which would prevent this footgun in more places.

1 Like