Suspending/resuming multiple tasks by using continuations

The compiles:

func suspendResumeTask () async throws {
    var contv: [CheckedContinuation <Void, Never>] = []
    
    let i = 0
    Task {
        await withCheckedContinuation {
            contv.append ($0)
            print ("\(i) suspending")
        }
        print ("\(i) resumed")
    }
    
    try await Task.sleep(until: .now + .seconds(7))
    for cont in contv {
        cont.resume()
    }
}

But, this one does not:

func suspendResumeTasks () async throws {
    var contv: [CheckedContinuation <Void, Never>] = []
    
    for i in 0..<1 {
        Task {
            await withCheckedContinuation {
                contv.append ($0)
                print ("\(i) suspending")
            }
            print ("\(i) resumed")
        }
    }
    
    try await Task.sleep(until: .now + .seconds(7))
    for cont in contv {
        cont.resume()
    }

What's being transferred here?

Error: Value of non-Sendable type ... accessed after being transferred; later accesses could race
Actual Error
Error: Value of non-Sendable type '@isolated(any) @async @callee_guaranteed @substituted <τ_0_0> () -> @out τ_0_0 for <()>' accessed after being transferred; later accesses could race

Also, am I doing something wrong?

1 Like

I don't think this should compile either. You have a mutable array of continuations, being passed to a task (requiring it to be in a disconnected region) and mutated, plus used after. Seems like another issue with Task diagnostic to me. The error cryptic here, it is contv that is being captured in an unsafe way.

1 Like

Here is the revised version using the AsyncStream.

func suspendResumeTasks (M: Int = 5) async throws {
    let chan = Channel <CheckedContinuation <Void, Never>> ()
    
    for i in 0..<M {
        Task {
            await withCheckedContinuation {
                chan.send ($0)
                print ("\(i) suspending")
            }
            print ("\(i) resumed")
        }
    }

    var n = 0
    for await cont in chan.stream {
        cont.resume()
        n += 1
        if n == M {
            break
        }
    }
}

struct Channel <T>: Sendable where T:Sendable {
    let stream : AsyncStream <T>
    let cont   : AsyncStream <T>.Continuation
    
    init () {
        let u = Self.makeStreamAndCont ()
        self.stream = u.0
        self.cont   = u.1
    }
    
    func send (_ u: T?) {
        if let u {
            cont.yield(u)
        }
        else {
            cont.finish()
        }
    }
    
    static func makeStreamAndCont () -> (AsyncStream <T>, AsyncStream <T>.Continuation) {
        let t = AsyncStream <T>.makeStream()
        return t
    }
}

1 Like

JFYI AsyncStream has its own makeStream method that returns steam and continuation.

1 Like

agreed. this looks like the same data race safety hole issue raised on GH and in this recent post.

While I don't know the underlying reason and it could be the same, the conditions in this case are at least slightly different. For me the diagnostics were correct when child task was not isolated to any global actor, but were missing when I specified one. Here there is no mention of any global actors, and warnings are still missing.

That was the function I looked for in the AsyncStream doc, before rolling my own, but I missed it.

I will edit my post to put that in.

Thank you.