AsyncSequence / Stream version of PassthroughSubject or CurrentValueSubject?

Hello everyone,

I have searched for couple of hours on async sequence / stream (I am still new to the swift concurrency) and can not find anything that resembles a PassthroughSubject or CurrentValueSubject from Combine which I can easily bridge from imperative code (think delegates from CoreLocation, CoreBluetooth etc.).
Can someone point me to a documentation for such type if it exists or share some resource on how to implement one if available?

If I use the above examples I would want to achieve something like

for await location in locationService.locationsSequence {
  print(location)
}

Thank you in advance :slight_smile:

1 Like

I don't think there's a real subject equivalent yet. There's some work going on in the AsyncAlgorithms package, but we don't have the full multicast capabilities of Combine. There's AsyncChannel (sp?) but the equivalent send(_:) function is async and isn't the fire-and-forget API that a Combine Subject permits.

One thing that I've been using as a stop gap is to wrap a Subject in an AsyncStream. This leans on Combine's multicasting capabilities while we wait for some enhanced multicasting capabilities.

Essentially, create a new new AsyncStream for each subscriber that wraps the subject you wish to multicast.

I would typically create a convenience along the lines of:

final class SomeClass {

    private let someTypeSubject = PassthroughSubject<SomeType, Never>() 
    
    public var someTypeStream: AsyncStream<SomeType> {
        AsyncStream { continuation in
            let cancellable = self.someTypeSubject.sink { continuation.yield($0) }
            continuation.onTermination = { continuation in
                cancellable.cancel()
            }
        }
    }
}
2 Likes

I just looked at AsyncChannel from AsyncAlgorithms and seems the closest to PassthroughSubject. Gonna take it for a spin.
Thanks for the suggestion

1 Like

Great, let us know how you get on with it.

1 Like

You have CurrentValue async stream here in AsyncExtension Package AsyncExtensions/AsyncStreams+CurrentValue.swift at main · sideeffect-io/AsyncExtensions · GitHub

1 Like

I've been banging on roughly this problem this week GitHub - rustle/TaskHostedAsyncSequence

Some stuff that works but I don't love:

  • I'm grabbing the continuation in a way that assumes it's vended synchronously :frowning:
  • I'm using a CheckedContinuation to wait for the task to get going, which I wouldn't be surprised to find out is messy

I wanted something that would work back to iOS 14 and macOS 10.15 or I'd probably have used buffer + AsyncPublisher/AsyncThrowingPublisher

public struct TaskHostedAsyncSequence<Output>: Sendable {
    private var continuation: AsyncStream<Output>.Continuation!
    private var task: Task<Void, Never>!
    public init(receiveValue: @escaping (Output) async -> Void) async {
        // Setup stream and continuation
        var streamContinuation: AsyncStream<Output>.Continuation?
        let stream = AsyncStream {
            streamContinuation = $0
        }
        precondition(streamContinuation != nil)
        self.continuation = streamContinuation
        // Wait for Task to get going
        await withCheckedContinuation { taskLaunchContinuation in
            self.task = Task {
                taskLaunchContinuation.resume()
                guard !Task.isCancelled else { return }
                for await value in stream {
                    guard !Task.isCancelled else { break }
                    await receiveValue(value)
                }
            }
        }
    }
    public func send(_ value: Output) {
        continuation.yield(value)
    }
    public func finish() async {
        // Wrap up the stream
        continuation.finish()
        // Wait for the Task to exit
        _ = await task.result
    }
}

public struct TaskHostedThrowingAsyncSequence<Output>: Sendable {
    private var continuation: AsyncThrowingStream<Output, any Error>.Continuation!
    private var task: Task<Void, any Error>!
    public init(receiveValue: @escaping (Output) async throws -> Void) async {
        // Setup stream and continuation
        var streamContinuation: AsyncThrowingStream<Output, any Error>.Continuation?
        let stream = AsyncThrowingStream {
            streamContinuation = $0
        }
        precondition(streamContinuation != nil)
        self.continuation = streamContinuation
        // Wait for Task to get going
        await withCheckedContinuation { taskLaunchContinuation in
            self.task = Task {
                taskLaunchContinuation.resume()
                try Task.checkCancellation()
                for try await value in stream {
                    try Task.checkCancellation()
                    try await receiveValue(value)
                }
            }
        }
    }
    public func send(_ value: Output) {
        continuation.yield(value)
    }
    public func finish() async throws {
        // Wrap up the stream
        continuation.finish()
        // Wait for the Task to exit
        switch await task.result {
        case .success(_):
            return
        case .failure(let error):
            throw error
        }
    }
}
1 Like