Support multiple listeners with Swift Concurrency

I would like to have a simple generic object that looks something like this:

actor MultipleListenerOperation<Success> {
    let operation: () async throws -> Success

    init(operation: @escaping @Sendable () async throws -> Success) {
        self.operation = operation
    }
    
    // starts the operation
    func start() {
        fatalError("...")
    }

    // If the operation hasn't started it waits for the operation to start and then
    // waits for the result of the operation.
    // If the operation has started, waits for the result of the operation.
    // This property supports multiple concurrent consumers.
    var result: Success {
        get async throws {
            fatalError("...")
        }
    }
}

Where you can instantiate it with a block to perform and multiple consumers can await a result.

I've been able to implement it with an AsyncPublisher driven by a subject. I've also been able to implement it by posting notifications. I've also toyed with the idea of holding onto multiple continuations, and there is an example of that type of solution here.

Although they work, none of these solutions seem optimal. They all seem heavy handed. For example, using an AsyncSequence for a single value seems too heavy.

Any thoughts on the "proper" Swift Concurrency way of solving this? It seems like there is something missing or I'm missing something.

I myself consider anything and everything that the Swift Concurrency system has to offer as proper Swift Concurrency but I suppose here you are trying to avoid *Continuations, AsyncStreams etc.

You could do it this way with only using Tasks

actor MultipleListenerOperation<Success: Sendable> {
    private let operationTask: Task<Success, Error>
    private let waitTask: Task<Void, Never>

    init(operation: @escaping @Sendable () async throws -> Success) {
        let _waitTask = Task<Void, Never> {
            try? await Task.sleep(until: .now + .nanoseconds(Int.max), clock: .continuous)
        }
        self.waitTask = _waitTask
        self.operationTask = Task {
            await _waitTask.value
            return try await operation()
        }
    }
    
    // starts the operation
    func start() {
        waitTask.cancel()
    }

    // If the operation hasn't started it waits for the operation to start and then
    // waits for the result of the operation.
    // If the operation has started, waits for the result of the operation.
    // This property supports multiple concurrent consumers.
    var result: Success {
        get async throws {
            try await operationTask.value
        }
    }
}

I suppose what you are "missing" or looking for is a Task-like object that can be manually started, since the ordinary Task already support multiple consumers.

Thank you for the reply!

I myself consider anything and everything that the Swift Concurrency system has to offer as proper Swift Concurrency

I see, sure that makes sense.

but I suppose here you are trying to avoid *Continuations, AsyncStreams etc.

AsyncStream feels heavy-handed. A single continuation I think would be fine, but keeping track of a collection of them felt wrong.

Is there any significant overhead with this method?

Thanks for the thought you put into this!

There is the overhead of getting the current time .now and then enqueueing an internal sleep task on the global concurrent executor (basically a call to dispatch_after_f). I'm not totally certain, but I think that the sleep task is allocated on the Task's (here _waitTask's) memory slab.

The variant

try? await Task.sleep(nanoseconds: UInt64(Int.max))

doesn't retrieve the current time so it might have less overhead but I honestly don't know.

1 Like