Passing AsyncIterator to an async method

I'm experimenting with withTimeout(in:), and it's raised an interesting question about passing mutable values to async functions, and specifically passing AsyncIterators.

I have a custom AsyncSequence called LedgerResponseSequence. It is a sequence of responses to commands I send on another channel, and wraps a FileHandle:

let ledgerOutput = readPipe.fileHandleForReading
var responses = LedgerResponseSequence(reader: ledgerOutput).makeAsyncIterator()

In a loop, I consume commands from an AsyncChannel, send them, and then wait for a response and send it to the caller on a one-shot AsyncChannel (like I would in Go):

for try await command in commandChannel {
    if Task.isCancelled { ... fail command and return ... }
    
    sendCommand(command)    

    // Wait for response
    do {
        guard let response = try await responses.next() else { throw CancellationError() }
        await command.response.send(response)
        command.response.finish()
    } catch {
        await command.response.fail(error)
    }
}

So far, so good. Then I naively added withTimeout:

do {
    // Wrap previous code with a timeout
    try await withTimeout(in: .seconds(2)) {
        guard let response = try await responses.next() else {
            throw CancellationError()
        }
        // etc.
    }
}

Unsurprisingly, this fails to compile with Mutation of captured var 'responses' in concurrently-executing code. That's probably fine and correct.

The question is the correct way to deal with this. My current solution is to hand-roll an inout parameter by returning a mutated copy of the AsyncIterator:

responses = try await withTimeout(in: .seconds(2)) { [responses] in
    var responses = responses
    // ... etc ... mutating responses
    return responses
}

This works, but is it correct? (It technically also "works" if I just mutate a copy and don't both returning it, but that feels like relying on implementation details of this particular iterator.)

This is an interesting question which @Philippe_Hausler and I thought about a bit already. My current take is that AsyncIterators should not be Sendable since they are the connection between the AsyncSequence and the consuming Task. As soon as you make the iterator Sendable it can be consumed by multiple tasks at once and the AsyncSequence cannot guarantee its behaviour anymore, e.g. if it is unicast or multicast.

However, use-cases such as yours are valid and we should be able to solve them but we are missing a feature here which is the combination of Sendable and move-only types. Almost all iterators that I implemented or came across by now want to become move-only types. Once we get this combination we can safely send them across Tasks.

6 Likes

What would move-only look like in this scenario? Would it allow the equivalent of inout on the closure? This is how I originally tried to write it, but it unsurprisingly doesn't work currently

// inout moves responses to closure until it completes, at which point it is moved back
// `responses` is not permitted to escape, and `responses` must be moveable.
try await withTimeout(in: .seconds(2)) { [inout responses] in
    // ... etc ... mutating responses

    // No need to return a value; will be copied back
}

That is a very good point and something I just brought up in the borrow/take review.