Unclear Actor Isolation in AsyncStream

Hey folks,

I'm looking to use AsyncStream, but I've run into some issues with two possible approaches. I'll paste the code snippet with relevant comments below.

This problem resembles this outstanding issue (`Task { [weak self] in ... }` loses its actor context when called in non-global actor method · Issue #62604 · swiftlang/swift · GitHub), however this appearing in an AsyncStream adds an additional layer of confusion.

class MyClass {
    func doNonisolatedWork(with closure: @escaping () -> Void) { }
}

actor MyActor {
    let nonisolatedClass = MyClass()
    var mainActorClosure: @MainActor () -> Int = { 1 }
    var actorProperty: Int = 0
    var stream: AsyncStream<Int>?

    // APPROACH #1: Assign property directly
    // This error follows my understanding of Swift concurrency: `doNonisolatedWork` closure cannot access actor-isolated properties in a non-isolated context
    func storedAsyncStreamProperty() {
        stream = AsyncStream { continuation in
            nonisolatedClass.doNonisolatedWork { [weak self] in
                guard let self else { return }
                if actorProperty > 0 { // ❌ Actor-isolated property 'actorProperty' can not be referenced from a nonisolated context
                    Task {
                        let value = await self.mainActorClosure()
                        continuation.yield(value)
                    }
                } else {
                    continuation.finish()
                }
            }
        }
        Task {
            guard let stream = self.stream else { return }
            for await _ in stream {
                // do work with return value here
            }
        }
    }

    // APPROACH #2: Create a computed property and store the value later
    // It doesn't make sense to me that the compiler allows `doNonisolatedWork` closure to access `actorProperty` in non-isolated context.
    // Even though this compiles, `mainActorClosure` executes on a concurrent thread, despite its @MainActor attribute
    func computedAsyncStreamProperty() {
        var stream: AsyncStream<Int> {
            AsyncStream { continuation in
                nonisolatedClass.doNonisolatedWork { [weak self] in
                    guard let self else { return }
                    if actorProperty > 0 {
                        Task {
                            let value = await self.mainActorClosure() // ⚠️ No 'async' operations occur within 'await' expression
                            continuation.yield(value)
                        }
                    } else {
                        continuation.finish()
                    }
                }
            }
        }
        self.stream = stream
        Task {
            guard let stream = self.stream else { return }
            for await _ in stream {
                // do work with return value here
            }
        }
    }
}

hi there, and welcome to the Swift forums. this is not really a direct answer to your question, but i want to point out that i think you may have identified a data race safety hole. specifically the issue appears to be with that local computed variable pattern.

here is a smaller example that highlights the problem:

final class NS {
    var a = [Int]()
    func read() {
        for _ in 1...100 {
            let x = a.last
            _ = x
        }
    }
    func write() {
        for _ in 1...100 {
            a.append(0)
        }
    }
}

actor A {
    let ns = NS()

    func g() {
        var race: () {
            Task.detached {
                let ns = self.ns // should presumably not be allowed
                ns.read()
            }

            Task.detached {
                let ns = self.ns
                ns.write()
            }
        }
        let _: () = race
    }
}

i encourage you to file a bug on the github repo

2 Likes

thanks Jamie!

filed bug report here

3 Likes