AsyncSequence uniqued by key

I'm new to asynchronous programming so forgive me if this is an obvious question. I'm working on a network client library for a protocol where responses, keyed by an integer identifier in the request, may arrive out of order.

Currently I'm doing something like this (code simplified for brevity):

@MainActor
private func response(for handle: OcaUint32) async throws -> Ocp1Response {
    let deadline = Date() + responseTimeout
    repeat {
        for await response in monitor.channel.filter({ $0.handle == handle }) {
            return response
        }
    } while Date() < deadline
    throw Ocp1Error.responseTimeout
}

@MainActor
func sendCommandRrq(_ command: Ocp1Command) async throws -> Ocp1Response {
    try await sendMessage(command, type: .ocaCmdRrq)
    return try await response(for: command.handle)
}

The problem of course is that enumerating the responses prior to filtering them stops other consumers from seeing them, and so they never see the response.

What's the idiomatic way to do this? I notice looking at the AsyncChannel implementation, consumers are identified by an integer ID. An API where consumers could specify their own ID and were only resumed where, say, an Identifiable value matched that, would work. But I don't know if that's the right abstraction.

Any pointers most welcome.

Maybe rubber ducking works :slight_smile:

I replaced the channel in my monitor actor with a dictionary of request IDs to continuations.

private func response(for handle: OcaUint32) async throws -> Ocp1Response {
    guard let monitor = await monitor else {
        throw Ocp1Error.notConnected
    }
    
    return try await withCheckedThrowingContinuation { continuation in
        Task { @MainActor in
            try await withTimeout(seconds: responseTimeout) {
                await monitor.subscribe(handle, to: continuation)
            }
        }
    }
}

The monitor resumes the continuation when it receives a matching packet (throwing an error on deinit for any unresumed continuations).

actor Monitor {
    typealias Continuation = CheckedContinuation<Ocp1Response, Error>
    private var continuations = [OcaUint32:Continuation]()

...
    func subscribe(_ handle: OcaUint32, to continuation: Continuation) {
        let continuation = self.continuations[handle]
        
        if continuation != nil {
            self.continuations.removeValue(forKey: handle)
        }
        return continuation
    }
        
    func continuation(for handle: OcaUint32) -> Continuation? {
        self.continuations[handle]
    }
}

I still have some bugs but I think I'm closer.