So, I was trying to create a wrapper that would allow me to iterate a single async iterator from multiple concurrent tasks. I tried a naïve approach:
actor SharedAsyncIterator<Sequence: AsyncSequence>
where Sequence.Element: Sendable {
var iterator: Sequence.AsyncIterator
init(iterating sequence: Sequence) {
self.iterator = sequence.makeAsyncIterator()
}
func next() async rethrows -> Sequence.Element? {
try await iterator.next()
}
}
This immediately fails with a compiler error I hadn't seen before:
swiftc -strict-concurrency=complete actor-iterator.swift
actor-iterator.swift:11:28: error: cannot call mutating async function 'next()' on actor-isolated property 'iterator'
try await iterator.next()
^
It seems that this error is because this may cause the actor-protected value in iterator
to be mutated outside the actor, since actor isolation ends at await
? Though I've read several threads about this and not certain I've yet read or understood the "actual" reason for this error...
In any case, one can dodge the error easily enough:
actor SharedAsyncIterator<Sequence: AsyncSequence>
where Sequence.Element: Sendable {
class Inner {
var iterator: Sequence.AsyncIterator
init(iterator: Sequence.AsyncIterator) {
self.iterator = iterator
}
func next() async rethrows -> Sequence.Element? {
try await iterator.next()
}
}
let inner: Inner
init(iterating sequence: Sequence) {
self.inner = Inner(iterator: sequence.makeAsyncIterator())
}
func next() async rethrows -> Sequence.Element? {
try await inner.next()
}
}
This compiles without error. I find that concerning: it's exactly equivalent to the previous code (except with an additional allocation); whatever behavior the previous error was trying to prevent is certainly present in this code.
Let's try to break it, here's a little async sequence that uses yield
to pretend it's doing something actually asynchronous:
struct AsyncThousand: AsyncSequence, Sendable {
typealias Element = Int
struct AsyncIterator: AsyncIteratorProtocol {
var value = 0
mutating func next() async -> Int? {
let v = value
await Task.yield()
if v < 1000 {
value = v + 1
return v
} else {
return nil
}
}
}
func makeAsyncIterator() -> AsyncIterator {
AsyncIterator()
}
}
And here's a little test harness that uses my "SharedIterator" above to iterate this sequence in parallel:
let sharedIterator = SharedAsyncIterator(iterating: AsyncThousand())
let allResults = await withTaskGroup(of: [Int].self) { group in
for _ in 0..<10 {
group.addTask {
var results = [Int]()
while let result = await sharedIterator.next() {
results.append(result)
}
return results
}
}
var allResults = [Int]()
for await results in group {
allResults.append(contentsOf: results)
}
return allResults
}
print(allResults.count)
print(Set(allResults).count)
Running this, we can see we have problems, though no crashes:
swiftc -strict-concurrency=complete actor-iterator.swift && ./actor-iterator
1505
1000
505 elements of the AsyncThousand
stream have been duplicated, because the next
method on the iterator was "reentered" during its yield
.
OK, if we're not crashing, all we need is to prevent that reentry into the next
function, and we'll be done! Here's an attempt to ensure that only one task can enter the SharedIterator
at a time:
actor SharedAsyncIterator<Sequence: AsyncSequence>
where Sequence.Element: Sendable {
class Inner {
var iterator: Sequence.AsyncIterator
var busy = false
var queue = [CheckedContinuation<Void, Never>]()
init(iterator: Sequence.AsyncIterator) {
self.iterator = iterator
}
func next() async rethrows -> Sequence.Element? {
if busy {
await withCheckedContinuation {
queue.append($0)
}
} else {
busy = true
}
defer {
if queue.isEmpty {
busy = false
} else {
queue.remove(at: 0).resume()
}
}
return try await iterator.next()
}
}
let inner: Inner
init(iterating sequence: Sequence) {
self.inner = Inner(iterator: sequence.makeAsyncIterator())
}
func next() async rethrows -> Sequence.Element? {
try await inner.next()
}
}
Basically, when we enter next
, if we're already busy with a previous invocation, queue up behind the currently executing task. When we finish calling the underlying iterator's next
, we can resume the next task in the queue, or go back to idling if nobody's waiting.
Uh-oh:
swiftc -strict-concurrency=complete actor-iterator.swift && ./actor-iterator
lldb -- ./actor-iterator
Process 48036 launched: '/Users/***/Desktop/actor-iterator' (arm64)
Process 48036 stopped
* thread #2, queue = 'com.apple.root.default-qos.cooperative', stop reason = EXC_BAD_ACCESS (code=1, address=0x18)
frame #0: 0x00000001ba2921d0 libswiftCore.dylib`Swift._StringObject.getSharedUTF8Start() -> Swift.UnsafePointer<Swift.UInt8> + 8
libswiftCore.dylib`Swift._StringObject.getSharedUTF8Start() -> Swift.UnsafePointer<Swift.UInt8>:
-> 0x1ba2921d0 <+8>: ldr x0, [x0, #0x18]
0x1ba2921d4 <+12>: ret
0x1ba2921d8 <+16>: adrp x8, 265771
0x1ba2921dc <+20>: add x1, x8, #0xf28
Target 0: (actor-iterator) stopped.
(lldb) bt
* thread #2, queue = 'com.apple.root.default-qos.cooperative', stop reason = EXC_BAD_ACCESS (code=1, address=0x18)
* frame #0: 0x00000001ba2921d0 libswiftCore.dylib`Swift._StringObject.getSharedUTF8Start() -> Swift.UnsafePointer<Swift.UInt8> + 8
frame #1: 0x00000001ba292200 libswiftCore.dylib`Swift._StringObject.sharedUTF8.getter : Swift.UnsafeBufferPointer<Swift.UInt8> + 24
frame #2: 0x00000001ba288dd0 libswiftCore.dylib`Swift._StringGuts.append(Swift._StringGutsSlice) -> () + 1032
frame #3: 0x0000000238027e58 libswift_Concurrency.dylib`Swift.CheckedContinuation.resume(returning: __owned τ_0_0) -> () + 376
frame #4: 0x0000000100007044 actor-iterator`Swift.CheckedContinuation.resume< where τ_0_0 == ()>() -> () + 16
frame #5: 0x00000001000058ec actor-iterator`$defer<τ_0_0 where τ_0_0: Swift.AsyncSequence, τ_0_0.Element: Swift.Sendable>() -> () + 356
frame #6: 0x0000000100005604 actor-iterator`(5) suspend resume partial function for main.SharedAsyncIterator.Inner.next() async throws -> Swift.Optional<τ_0_0.Element> + 76
frame #7: 0x0000000100005c94 actor-iterator`(2) await resume partial function for main.SharedAsyncIterator.next() async throws -> Swift.Optional<τ_0_0.Element>
frame #8: 0x0000000100006814 actor-iterator`(2) await resume partial function for closure #1 @Sendable () async -> Swift.Array<Swift.Int> in closure #1 (inout Swift.TaskGroup<Swift.Array<Swift.Int>>) async -> Swift.Array<Swift.Int> in main
frame #9: 0x0000000100007540 actor-iterator`(1) await resume partial function for reabstraction thunk helper <τ_0_0 where τ_0_0: Swift.Sendable> from @escaping @callee_guaranteed @Sendable @async () -> (@out τ_0_0) to @escaping @callee_guaranteed @async () -> (@out τ_0_0, @error @owned Swift.Error)
frame #10: 0x0000000100007680 actor-iterator`(1) await resume partial function for partial apply forwarder for reabstraction thunk helper <τ_0_0 where τ_0_0: Swift.Sendable> from @escaping @callee_guaranteed @Sendable @async () -> (@out τ_0_0) to @escaping @callee_guaranteed @async () -> (@out τ_0_0, @error @owned Swift.Error)
So, I've crashed horribly in completely "safe" swift, with strict-concurrency checks enabled. Something's fallen through the cracks somewhere; there should certainly be a compilation error somewhere in my code! Perhaps this is in some way what that original error was trying to prevent...
I can "fix" the problem with a lock and a class wrapper and a compiler warning telling me this won't continue to work:
actor SharedAsyncIterator<Sequence: AsyncSequence>
where Sequence.Element: Sendable {
class Inner {
var iterator: Sequence.AsyncIterator
let lock: NSLock = NSLock()
init(iterator: Sequence.AsyncIterator) {
self.iterator = iterator
}
func next() async rethrows -> Sequence.Element? {
lock.lock()
defer { lock.unlock() }
return try await iterator.next()
}
}
let inner: Inner
init(iterating sequence: Sequence) {
self.inner = Inner(iterator: sequence.makeAsyncIterator())
}
func next() async rethrows -> Sequence.Element? {
try await inner.next()
}
}
swiftc -strict-concurrency=complete actor-iterator.swift && ./actor-iterator
actor-iterator.swift:15:18: warning: instance method 'lock' is unavailable from asynchronous contexts; Use async-safe scoped locking instead; this is an error in Swift 6
lock.lock()
^
Foundation.NSLock:11:15: note: 'lock()' declared here
open func lock()
^
1000
1000
I'm left with a few questions for the tl;dr:
- Am I right that "safe" swift, that compiles without error or warning with -strict-concurrency=complete, and does not call a buggy or unsafe API, should never experience a memory-unsafety crash?
- Is the original error too narrow? Is it ever safe to call an
async
method on a non-Sendableself
that's "protected state" of an actor? - How should I write an async iterator adapter that allows multiple consumers to consume a single async iterator?