Passing AsyncIterator to an async method

I'm experimenting with withTimeout(in:), and it's raised an interesting question about passing mutable values to async functions, and specifically passing AsyncIterators.

I have a custom AsyncSequence called LedgerResponseSequence. It is a sequence of responses to commands I send on another channel, and wraps a FileHandle:

let ledgerOutput = readPipe.fileHandleForReading
var responses = LedgerResponseSequence(reader: ledgerOutput).makeAsyncIterator()

In a loop, I consume commands from an AsyncChannel, send them, and then wait for a response and send it to the caller on a one-shot AsyncChannel (like I would in Go):

for try await command in commandChannel {
    if Task.isCancelled { ... fail command and return ... }
    
    sendCommand(command)    

    // Wait for response
    do {
        guard let response = try await responses.next() else { throw CancellationError() }
        await command.response.send(response)
        command.response.finish()
    } catch {
        await command.response.fail(error)
    }
}

So far, so good. Then I naively added withTimeout:

do {
    // Wrap previous code with a timeout
    try await withTimeout(in: .seconds(2)) {
        guard let response = try await responses.next() else {
            throw CancellationError()
        }
        // etc.
    }
}

Unsurprisingly, this fails to compile with Mutation of captured var 'responses' in concurrently-executing code. That's probably fine and correct.

The question is the correct way to deal with this. My current solution is to hand-roll an inout parameter by returning a mutated copy of the AsyncIterator:

responses = try await withTimeout(in: .seconds(2)) { [responses] in
    var responses = responses
    // ... etc ... mutating responses
    return responses
}

This works, but is it correct? (It technically also "works" if I just mutate a copy and don't both returning it, but that feels like relying on implementation details of this particular iterator.)

1 Like

This is an interesting question which @Philippe_Hausler and I thought about a bit already. My current take is that AsyncIterators should not be Sendable since they are the connection between the AsyncSequence and the consuming Task. As soon as you make the iterator Sendable it can be consumed by multiple tasks at once and the AsyncSequence cannot guarantee its behaviour anymore, e.g. if it is unicast or multicast.

However, use-cases such as yours are valid and we should be able to solve them but we are missing a feature here which is the combination of Sendable and move-only types. Almost all iterators that I implemented or came across by now want to become move-only types. Once we get this combination we can safely send them across Tasks.

6 Likes

What would move-only look like in this scenario? Would it allow the equivalent of inout on the closure? This is how I originally tried to write it, but it unsurprisingly doesn't work currently

// inout moves responses to closure until it completes, at which point it is moved back
// `responses` is not permitted to escape, and `responses` must be moveable.
try await withTimeout(in: .seconds(2)) { [inout responses] in
    // ... etc ... mutating responses

    // No need to return a value; will be copied back
}

That is a very good point and something I just brought up in the borrow/take review.

I've run into a similar situation.

I've built a toAsyncStream and toAsyncThrowingStream to mirror Combines eraseToAnyPublisher

The implementation is like so:

fileprivate extension AsyncStream {
    init<Base: AsyncSequence>(from sequence: Base, file: StaticString = #file, line: UInt = #line) where Element == Base.Element, Element: Sendable {
        var iterator = sequence.makeAsyncIterator()
        // FIXME: In later swift versions, AsyncSequence protocol will likely have an associated error type.
        // FIXME: For now, produce an assertionFailure to let developer know to use an AsyncThrowingStream instead.
        self.init {
            do {
                return try await iterator.next()
            } catch {
                assertionFailure("warning: Base AsyncSequence threw an error. Use AsyncThrowingStream instead", file: file, line: line)
                return nil
            }
        }
    }
}

fileprivate extension AsyncThrowingStream {
    init<Base: AsyncSequence>(from sequence: Base) where Element == Base.Element, Element: Sendable, Failure == any Error {
        var iterator = sequence.makeAsyncIterator()
        self.init {
            try await iterator.next()
        }
    }
}

extension AsyncSequence where Element: Sendable {
    /// Type erases the `AsyncSequence` into an `AsyncStream`
    /// - Returns: An `AsyncStream` created from the base `AsyncSequence`
    ///
    /// - Note: AsyncSequences do not expose their error type.
    /// So this function is available for both throwing and non-throwing `AsyncSequences`.
    /// It will produce an `assertionFailure` at runtime if the base sequence throws.
    func asAsyncStream(file: StaticString = #filePath, line: UInt = #line) -> AsyncStream<Element> {
        AsyncStream(from: self, file: file, line: line)
    }

    /// Type erases the `AsyncSequence` into an `AsyncThrowingStream`
    /// - Returns: An `AsyncThrowingStream` from the base `AsyncSequence`
    func asAsyncThrowingStream() -> AsyncThrowingStream<Element, any Error> {
        AsyncThrowingStream(from: self)
    }
}

And I get the same Mutation of captured var 'iterator' in concurrently-executing code; this is an error in the Swift 6 language mode warning.

I just learned AsyncStream has an initializer that takes in an async sequence, so I can delete all my code :joy:

@BrentM AsyncStream does not have an initializer that takes an AsyncSequence as parameter. Maybe you have GitHub - pointfreeco/swift-concurrency-extras: Useful, testable Swift concurrency. as a dependency?

your right. I am using swift-dependencies, which I believe has that bundled with it