AsyncStream stops yielding values

Hello I'm a beginner to Swift Concurrency and have run into an issue with AsyncStream. I've run into a situation that causes an observing of a for loop to receiving a values from an AsyncStream.

At the bottom is the code that you can copy it into a Swift Playground and run.

The code is supposed to mock a system that has a service going through a filter to read and write to a connection.

Here is a log of the prints

πŸ™ˆπŸ«΄ setupRTFAsyncWrites Start
⬅️ Pretend to write 0
➑️ Pretend to read 0
	feed into filter 0
	yield write data 1
		πŸ™ˆπŸ«΄ setupRTFAsyncWrites: write(1 bytes)
		β¬…οΈπŸ™ˆπŸ«΄ Async Event: dataToDevice: 1 bytes
⬅️ Pretend to write 1
➑️ Pretend to read 1
	feed into filter 1
	yield write data 2
// here our for loop should have picked up the value sent down the continuation. But instead it just sits here.
//: A UIKit based Playground for presenting user interface
  
import SwiftUI
import PlaygroundSupport
import Combine
import CommonCrypto
import Foundation

class TestConnection {
    var didRead: ((Data) -> ()) = { _ in }
    var count = 0
    init() {
    }

    func write(data: Data) {
        // pretend we sent this to the BT device
        print("⬅️ Pretend to write \(count)")

        Task {
            try await Task.sleep(ms: 200)
            print("➑️ Pretend to read \(self.count)")
            self.count += 1
            // pretend this is a response from the device
            self.didRead(Data([0x00]))
        }
    }
}

enum TestEvent: Sendable {
    case write(Data)
    case handshakeDone
}

class TestFilter {
    var eventsStream: AsyncStream<TestEvent>
    var continuation: AsyncStream<TestEvent>.Continuation

    private var count = 0

    init() {
        (self.eventsStream, self.continuation) = AsyncStream<TestEvent>.makeStream(bufferingPolicy: .unbounded)
    }

    func feed(data: Data) {
        print("\tfeed into filter \(count)")
        count += 1

        if count > 5 {
            print("\tβœ… handshake done")
            self.continuation.yield(.handshakeDone)
            return
        }

        Task {
            // data delivered to us by the BT device
            // pretend it takes time to process this and then we return an event sa
            try await Task.sleep(ms: 200)
            print("\tyield write data \(self.count)")
            // pretend this is a response from the device
            let result = self.continuation.yield(.write(Data([0x11])))
        }
    }

    func start() -> Data {
        return Data([0x00])
    }
}


class TestService {
    private let filter: TestFilter
    var task: Task<(), Never>?
    let testConn: TestConnection
    init(filter: TestFilter) {
        self.filter = filter

        self.testConn = TestConnection()
        self.testConn.didRead = { [weak self] data in
            self?.filter.feed(data: data)
        }

        self.task = Task { [weak self] () in
            await self?.setupAsyncWrites()
        }
    }

    func setupAsyncWrites() async {
        print("πŸ™ˆπŸ«΄ setupRTFAsyncWrites Start")
        for await event in self.filter.eventsStream {
            print("\t\tπŸ™ˆπŸ«΄ setupRTFAsyncWrites: \(event)")

            guard case .write(let data) = event else {
                print("\t\tπŸ™ˆπŸ«΄ NOT data to device: \(event)")
                continue
            }

            print("\t\tβ¬…οΈπŸ™ˆπŸ«΄ Async Event: dataToDevice: \(data)")
            self.testConn.write(data: data)
        } // for

        // This shouldn't end
        assertionFailure("This should not end")
    }

    public func handshake() async {
        let data = self.filter.start()
        self.testConn.write(data: data)
        await self.waitForHandshakedone()
    }

    private func waitForHandshakedone() async {
        for await event in self.filter.eventsStream {
            if case .handshakeDone = event {
                break
            }
            continue
        }
    }
}

Task {
    let service = TestService(filter: TestFilter())
    await service.handshake()
    print("Done")
}
/*
 This is what happens:
 πŸ™ˆπŸ«΄ setupRTFAsyncWrites Start
 ⬅️ Pretend to write 0
 ➑️ Pretend to read 0
     feed into filter 0
     yield write data 1
         πŸ™ˆπŸ«΄ setupRTFAsyncWrites: write(1 bytes)
         β¬…οΈπŸ™ˆπŸ«΄ Async Event: dataToDevice: 1 bytes
 ⬅️ Pretend to write 1
 ➑️ Pretend to read 1
     feed into filter 1
     yield write data 2

 // It just stops here, the `for` loop in setupAsyncWrites() should have picked up the event sent down the continuation after "yield write data 2"
 // It should say
 πŸ™ˆπŸ«΄ setupRTFAsyncWrites: write(1 bytes)
 β¬…οΈπŸ™ˆπŸ«΄ Async Event: dataToDevice: 1 bytes
 */

extension Task<Never, Never> {
    public static func sleep(ms duration: UInt64) async throws {
        try await Task.sleep(nanoseconds: 1_000_000 * duration)
    }
}

Hi @Biclops, if I'm reading your code correctly it seems that you are subscribing to an AsyncStream twice: once in setupAsyncWrites and once in waitForHandshakedone. Unfortunately AsyncStream does not support multiple subscribers. You can look into using AsyncChannel from the swift-async-algorithms package, but it's also subtly different. In particular, the channel.send method will suspend until someone consumes the value being emitted.

Thank you that explains a lot. I'll be sure to check it out.

yes, welcome to the forums! to be slightly pedantic and expand on Brandon's response a bit, AsyncStream does 'support multiple subscribers', but only in a specific sense, which may not offer the behavior you're looking for.

in particular, if a stream is awaited by more than one iterator, then the one that receives the next() element depends on the order in which the stream registered the iterators' calls to next(). in practice this often means you'll observe arbitrary delivery of elements across such iterators, since a stream delivers its elements to exactly one consumer – it does not 'broadcast' to all of them.

so in your sample code, it's not that the stream has 'stopped' yielding its values, but rather that it is not delivering them to all of its pending consumers. if you add a print statement into the for-await loop in waitForHandshakedone() you'll observe that the next data write element is (generally) consumed there.

Thank you for the additional info. I'll try updating my example with AsyncChannel to see if things are fixed.

Hello again. I've updated my code below and it seems like I'm experiencing the same problem. My new example was put into a framework so I could import async algorithms. I only made minor changes, chiefly changing out AsyncStream with AsyncChannel.

After reading the documentation I don't think AsyncChannel was intended for multiple subscribers either.
Here's what it says

/// A channel for sending elements from one task to another with back pressure.
///
/// The `AsyncChannel` class is intended to be used as a communication type between tasks,
/// particularly when one task produces values and another task consumes those values. The back
/// pressure applied by `send(_:)` via the suspension/resume ensures that
/// the production of values does not exceed the consumption of values from iteration. This method
/// suspends after enqueuing the event and is resumed when the next call to `next()`
/// on the `Iterator` is made, or when `finish()` is called from another Task.
/// As `finish()` induces a terminal state, there is no more need for a back pressure management.
/// This function does not suspend and will finish all the pending iterations.

Here is my updated code

import XCTest
import AsyncAlgorithms

class AsyncChannelTests: XCTestCase {
    func testFetchData() async throws {
        // Call the async function
        let service = TestService(filter: TestFilter())
        await service.handshake()
        print("Done βœ…")

        /*
         This is what happens:
         πŸ™ˆπŸ«΄ setupRTFAsyncWrites Start
         ⬅️ Pretend to write 0
         ➑️ Pretend to read 0
             feed into filter 0
             yield write data 1

         // It just stops here, the `for` loop in setupAsyncWrites() should have picked up the event sent down the continuation after "yield write data 2"
         // It should say
         πŸ™ˆπŸ«΄ setupRTFAsyncWrites: write(1 bytes)
         β¬…οΈπŸ™ˆπŸ«΄ Async Event: dataToDevice: 1 bytes
         */
    }
}

class TestConnection {
    var didRead: ((Data) -> ()) = { _ in }
    var count = 0
    init() {
    }

    func write(data: Data) {
        // pretend we sent this to the BT device
        print("⬅️ Pretend to write \(count)")

        Task {
            try await Task.sleep(ms: 200)
            print("➑️ Pretend to read \(self.count)")

            self.count += 1
            // pretend this is a response from the device
            self.didRead(Data([0x00]))
        }
    }
}

enum TestEvent: Sendable {
    case write(Data)
    case handshakeDone
}

class TestFilter {
    var channel: AsyncChannel<TestEvent> = .init()

    private var count = 0

    init() {

    }

    func feed(data: Data) {
        print("\tfeed into filter \(count)")

        Task {
            count += 1
            if count > 5 {
                print("\tβœ… handshake done")
                await self.channel.send(.handshakeDone)
                return
            }
            // data delivered to us by the BT device
            // pretend it takes time to process this and then we return an event sa
            try await Task.sleep(ms: 200)
            print("\tyield write data \(self.count)")
            // pretend this is a response from the device
            await self.channel.send(.write(Data([0x11])))
        }
    }

    func start() -> Data {
        return Data([0x00])
    }
}


class TestService {
    private let filter: TestFilter
    var task: Task<(), Never>?
    let testConn: TestConnection
    init(filter: TestFilter) {
        self.filter = filter

        self.testConn = TestConnection()
        self.testConn.didRead = { [weak self] data in
            self?.filter.feed(data: data)
        }

        self.task = Task { [weak self] () in
            await self?.setupAsyncWrites()
        }
    }

    func setupAsyncWrites() async {
        print("πŸ™ˆπŸ«΄ setupRTFAsyncWrites Start")
        for await event in self.filter.channel {
            print("\t\tπŸ™ˆπŸ«΄ setupRTFAsyncWrites: \(event)")

            guard case .write(let data) = event else {
                print("\t\tπŸ™ˆπŸ«΄ NOT data to device: \(event)")
                continue
            }

            print("\t\tβ¬…οΈπŸ™ˆπŸ«΄ Async Event: dataToDevice: \(data)")
            self.testConn.write(data: data)
        } // for

        // This shouldn't end
        assertionFailure("This should not end")
    }

    public func handshake() async {
        let data = self.filter.start()
        self.testConn.write(data: data)
        await self.waitForHandshakedone()
    }

    private func waitForHandshakedone() async {
        for await event in self.filter.channel {
            if case .handshakeDone = event {
                break
            }
            continue
        }
    }
}


extension Task<Never, Never> {
    public static func sleep(ms duration: UInt64) async throws {
        try await Task.sleep(nanoseconds: 1_000_000 * duration)
    }
}

Here I was able to fix it by using a PassthroughSubject and then converting it to an AsyncSequence. But now that I think about it, I think I'm going about this the wrong way. And that I should re-jigger my code to follow the structured concurrency paradigm.

// Unit test class
import XCTest
import AsyncAlgorithms
import Combine
class AsyncChannelTests: XCTestCase {
    func testFetchData() async throws {
        // Call the async function
        let service = TestService(filter: TestFilter())
        try await service.handshake()
        print("Done βœ…")

        /*
         This is what happens:
         πŸ™ˆπŸ«΄ setupRTFAsyncWrites Start
         ⬅️ Pretend to write 0
         ➑️ Pretend to read 0
             feed into filter 0
             yield write data 1

         // It just stops here, the `for` loop in setupAsyncWrites() should have picked up the event sent down the continuation after "yield write data 2"
         // It should say
         πŸ™ˆπŸ«΄ setupRTFAsyncWrites: write(1 bytes)
         β¬…οΈπŸ™ˆπŸ«΄ Async Event: dataToDevice: 1 bytes
         */
    }
}

class TestConnection {
    var didRead: ((Data) -> ()) = { _ in }
    var count = 0
    init() {
    }

    func write(data: Data) {
        // pretend we sent this to the BT device
        print("⬅️ Pretend to write \(count)")

        Task {
            try await Task.sleep(ms: 200)
            print("➑️ Pretend to read \(self.count)")

            self.count += 1
            // pretend this is a response from the device
            self.didRead(Data([0x00]))
        }
    }
}

enum TestEvent: Sendable {
    case write(Data)
    case handshakeDone
}

class TestFilter {
    var stream: AsyncThrowingPublisher<AnyPublisher<TestEvent, Never>> {
        self.subj.eraseToAnyPublisher().values
    }

    private var subj = PassthroughSubject<TestEvent, Never>()

    private var count = 0

    init() {

    }

    func feed(data: Data) {
        print("\tfeed into filter \(count)")

        Task {
            count += 1
            if count > 5 {
                print("\tβœ… handshake done")
                self.subj.send(.handshakeDone)
                return
            }
            // data delivered to us by the BT device
            // pretend it takes time to process this and then we return an event sa
            try await Task.sleep(ms: 200)
            print("\tyield write data \(self.count)")
            // pretend this is a response from the device
            self.subj.send(.write(Data([0x11])))
        }
    }

    func start() -> Data {
        return Data([0x00])
    }
}


class TestService {
    private let filter: TestFilter
    var task: Task<(), Never>?
    let testConn: TestConnection
    init(filter: TestFilter) {
        self.filter = filter

        self.testConn = TestConnection()
        self.testConn.didRead = { [weak self] data in
            self?.filter.feed(data: data)
        }

        self.task = Task { [weak self] () in
            await self?.setupAsyncWrites()
        }
    }

    func setupAsyncWrites() async {
        do {
            print("πŸ™ˆπŸ«΄ setupRTFAsyncWrites Start")
            for try await event in self.filter.stream {
                print("\t\tπŸ™ˆπŸ«΄ setupRTFAsyncWrites: \(event)")

                guard case .write(let data) = event else {
                    print("\t\tπŸ™ˆπŸ«΄ NOT data to device: \(event)")
                    continue
                }

                print("\t\tβ¬…οΈπŸ™ˆπŸ«΄ Async Event: dataToDevice: \(data)")
                self.testConn.write(data: data)
            } // for

            // This shouldn't end
            assertionFailure("This should not end")
        } catch {
            print(error)
        }
    }

    public func handshake() async throws {
        let data = self.filter.start()
        self.testConn.write(data: data)
        try await self.waitForHandshakedone()
    }

    private func waitForHandshakedone() async throws {
        for try await event in self.filter.stream {
            if case .handshakeDone = event {
                break
            }
            continue
        }
    }
}


extension Task<Never, Never> {
    public static func sleep(ms duration: UInt64) async throws {
        try await Task.sleep(nanoseconds: 1_000_000 * duration)
    }
}

Neither AsyncStream nor AsyncChannel support the broadcasting (sharing subscriptions) functionality you want to achieve. There was a proposal for adding this here: [WIP] Broadcast algorithm by phausler Β· Pull Request #214 Β· apple/swift-async-algorithms Β· GitHub.

I think for now, without rewriting a bunch of stuff, your best bet is to stick to Combine's subjects and bridging them to Concurrency like you did in your last post. I am currently doing the same in a couple of projects :man_shrugging:.

I've noticed that if I dispatch values through the subject too rapidly the subscribers may miss the dispatch. Do you know if there's a way around that?

You could use Combineβ€˜s buffer: buffer(size:prefetch:whenFull:) | Apple Developer Documentation