AsyncSequence / Stream version of PassthroughSubject or CurrentValueSubject?

Hello everyone,

I have searched for couple of hours on async sequence / stream (I am still new to the swift concurrency) and can not find anything that resembles a PassthroughSubject or CurrentValueSubject from Combine which I can easily bridge from imperative code (think delegates from CoreLocation, CoreBluetooth etc.).
Can someone point me to a documentation for such type if it exists or share some resource on how to implement one if available?

If I use the above examples I would want to achieve something like

for await location in locationService.locationsSequence {
  print(location)
}

Thank you in advance :slight_smile:

1 Like

I don't think there's a real subject equivalent yet. There's some work going on in the AsyncAlgorithms package, but we don't have the full multicast capabilities of Combine. There's AsyncChannel (sp?) but the equivalent send(_:) function is async and isn't the fire-and-forget API that a Combine Subject permits.

One thing that I've been using as a stop gap is to wrap a Subject in an AsyncStream. This leans on Combine's multicasting capabilities while we wait for some enhanced multicasting capabilities.

Essentially, create a new new AsyncStream for each subscriber that wraps the subject you wish to multicast.

I would typically create a convenience along the lines of:

final class SomeClass {

    private let someTypeSubject = PassthroughSubject<SomeType, Never>() 
    
    public var someTypeStream: AsyncStream<SomeType> {
        AsyncStream { continuation in
            let cancellable = self.someTypeSubject.sink { continuation.yield($0) }
            continuation.onTermination = { continuation in
                cancellable.cancel()
            }
        }
    }
}
4 Likes

I just looked at AsyncChannel from AsyncAlgorithms and seems the closest to PassthroughSubject. Gonna take it for a spin.
Thanks for the suggestion

2 Likes

Great, let us know how you get on with it.

1 Like

You have CurrentValue async stream here in AsyncExtension Package AsyncExtensions/AsyncStreams+CurrentValue.swift at main · sideeffect-io/AsyncExtensions · GitHub

2 Likes

I've been banging on roughly this problem this week GitHub - rustle/TaskHostedAsyncSequence

Some stuff that works but I don't love:

  • I'm grabbing the continuation in a way that assumes it's vended synchronously :frowning:
  • I'm using a CheckedContinuation to wait for the task to get going, which I wouldn't be surprised to find out is messy

I wanted something that would work back to iOS 14 and macOS 10.15 or I'd probably have used buffer + AsyncPublisher/AsyncThrowingPublisher

public struct TaskHostedAsyncSequence<Output>: Sendable {
    private var continuation: AsyncStream<Output>.Continuation!
    private var task: Task<Void, Never>!
    public init(receiveValue: @escaping (Output) async -> Void) async {
        // Setup stream and continuation
        var streamContinuation: AsyncStream<Output>.Continuation?
        let stream = AsyncStream {
            streamContinuation = $0
        }
        precondition(streamContinuation != nil)
        self.continuation = streamContinuation
        // Wait for Task to get going
        await withCheckedContinuation { taskLaunchContinuation in
            self.task = Task {
                taskLaunchContinuation.resume()
                guard !Task.isCancelled else { return }
                for await value in stream {
                    guard !Task.isCancelled else { break }
                    await receiveValue(value)
                }
            }
        }
    }
    public func send(_ value: Output) {
        continuation.yield(value)
    }
    public func finish() async {
        // Wrap up the stream
        continuation.finish()
        // Wait for the Task to exit
        _ = await task.result
    }
}

public struct TaskHostedThrowingAsyncSequence<Output>: Sendable {
    private var continuation: AsyncThrowingStream<Output, any Error>.Continuation!
    private var task: Task<Void, any Error>!
    public init(receiveValue: @escaping (Output) async throws -> Void) async {
        // Setup stream and continuation
        var streamContinuation: AsyncThrowingStream<Output, any Error>.Continuation?
        let stream = AsyncThrowingStream {
            streamContinuation = $0
        }
        precondition(streamContinuation != nil)
        self.continuation = streamContinuation
        // Wait for Task to get going
        await withCheckedContinuation { taskLaunchContinuation in
            self.task = Task {
                taskLaunchContinuation.resume()
                try Task.checkCancellation()
                for try await value in stream {
                    try Task.checkCancellation()
                    try await receiveValue(value)
                }
            }
        }
    }
    public func send(_ value: Output) {
        continuation.yield(value)
    }
    public func finish() async throws {
        // Wrap up the stream
        continuation.finish()
        // Wait for the Task to exit
        switch await task.result {
        case .success(_):
            return
        case .failure(let error):
            throw error
        }
    }
}
1 Like

This example use of AsyncStream is great as it allows us to connect non-Sendable emission logic to an AsyncSequence which itself will be Sendable if the Element it emits is Sendable. For example in my personal use case I want to move some UI events generated by a PassthroughSubject into the background and Sendable realm.

There's one easy to miss catch though. The AsyncStream in the example is flawed as it will buffer the events sent by the PassthroughSubject. This is not what you'd expect from such Subject. To fix this issue one has to explicitly set a different AsyncStream<Element>.Continuation.BufferingPolicy from the default .unbound to .bufferingOldest(0). This will resume all awaiting calls to iterator.next() when you yield a new value, but it will discard / drop everything if there is no active await anywhere.

Here's a playground example:

import PlaygroundSupport
import Combine

let page = PlaygroundPage.current
page.needsIndefiniteExecution = true

func foo() async {
  let subject = PassthroughSubject<Int, Never>()
  let stream = AsyncStream(bufferingPolicy: .bufferingOldest(0)) { continuation in
    let cancellable = subject.sink { signal in
      continuation.yield(signal)
    }
    continuation.onTermination = { continuation in
      cancellable.cancel()
    }
  }

  var iterator = stream.makeAsyncIterator()

  Task {
    print("will wait", Date.now)
    do {
      try await Task.sleep(nanoseconds: 2_000_000_000)
    } catch {
      print(error)
    }
    print("will resume and send", Date.now)

    subject.send(1)
    subject.send(2)

    print("will wait", Date.now)
    do {
      try await Task.sleep(nanoseconds: 4_000_000_000)
    } catch {
      print(error)
    }
    print("will resume and send", Date.now)

    subject.send(3)

    print("will wait", Date.now)
    do {
      try await Task.sleep(nanoseconds: 1_000_000_000)
    } catch {
      print(error)
    }
    print("will resume and send", Date.now)
    subject.send(4)
  }

  print(await iterator.next() as Any)

  do {
    try await Task.sleep(nanoseconds: 2_000_000_000)
  } catch {
    print(error)
  }

  print(await iterator.next() as Any)
  print(await iterator.next() as Any)
}

Task {
  await foo()
  print("finishing")
  page.finishExecution()
}

With .unbound this example will print 1, 2, 3, but with .bufferingOldest(0) it will result in 1, 3, 4 with the expected miss for the value 2.

Just on a further note. cancellation is of non-Sendable type (at this moment) and under strict concurrency checking cannot be captured by continuation.onTermination. This is the issue I'm trying to solve right now.

Current workaround:

@preconcurrency import Combine
1 Like

Yes, this is absolutely true. In my use case it didn't matter too much as I was immediately iterating the AsyncStream but if you intend to hold on to it for any period of time, it's critical to consider the buffering policy. Thanks for highlighting this.

The other important note is that this doesn't make AsyncStream a 'multi-pass' AsyncSequence, a fresh AsyncStream is required for each new consumer to safely iterate the values from the Subject.

More generally, I'm really keen on seeing Subject shaped AsyncSequence type in the standard library, but my feeling is we're stuck on waiting for a resolution on typed throws and the primary associated type for AsyncSequence before this type and the full potential of AsyncSequence can be realised.

1 Like

In my case that's fine as I only need a single stream instance to work with at a single place.


After toying around with the buffer policy and reading through the internal buffer implementation I just realized that with your example and a slight adjustment to .bufferingNewest(1) we can also wrap and mimic CurrentValueSubject as well.

1 Like

I think that would work with your use case, as you're using it for a single consumer. But if someone wishes to share the Subject amongst multiple consumers, they'd need to create a new AsyncStream, and therefore a new buffer, for each consumer, so it may not work quite as expected in that case.

True, I also run into some tight corners as I need to consume the values and then perform typical async transformations like merge. Since I had to operate on the AsyncIterator level for my needs, I'm out of luck moving back into the AsyncSequence realm. In my particular case I have inout AsyncIterator in so many places just to make sure that I'm mutating the same instance and not assuming that it's build with reference semantics even though that it is in several cases.

AsyncSequence really needs more love ASAP.

2 Likes

Totally agree. I'm hopeful once we see resolution on typed throws and primary associated types, we'll see some forward progress. Until then though, my feeling is that we're severely hamstrung on what we can do to get it to parity with other reactive frameworks.

Fingers-crossed for 2023.

2 Likes

Seems like there's a spread of support indirectly via the following two libraries:

  1. AsyncExtensions -> GitHub - sideeffect-io/AsyncExtensions: AsyncExtensions aims to mimic Swift Combine operators for async sequences.
  2. Asynchrone -> GitHub - reddavis/Asynchrone: Extensions and additions to AsyncSequence, AsyncStream and AsyncThrowingStream.

Either of these provide comprehensive support to the core Async/Await functionality that provides a nice way to work with them like reactive streams.

I would suggest to review your requirements and dig into both libraries. They both supported, although AsyncExtensions has a more recent release. However The asynchrone libraries seem more comprehensive.

2 Likes