Can/Should AsyncStream have runtime checks for multiple subscribers?

I made this thread for two purposes: first, to get feedback on whether some code I've written solves the problem in the thread title; second, to get opinions on whether a feature like this would be possible/useful to implement in AsyncStream itself, or if there's otherwise any way to get it in my codebase by default for AsyncStream users without a lint rule banning AsyncStream entirely.

In my (somewhat limited) experience seeing new developers adopt Swift Concurrency, the behavior of AsyncStream is a common source of bugs. Developers used to RxSwift or Combine assume that it supports multiple subscribers, and are surprised and dismayed to discover, often after landing code, that it does not. There's currently no warning about this either, just (seemingly) undefined behavior. And since shared AsyncSequences are usually used to communicate across features, it's unlikely that unit tests catch the problem.

I think understand the reasoning behind this decision: an AsyncSequence type can guarantee that it delivers all values sent to it to one subscriber, or it can guarantee that it delivers the last N values to multiple subscribers, but it can't do both without knowing the exact number of subscribers in advance. Bugs stemming from choosing a buffer size that's too small are much harder to detect than the current bad behavior.

I tried to implement a wrapper around AsyncStream that detects this case without false positives. The high level summary of how it works is as follows:

  1. Whenever makeAsyncIterator() is called, a new entry is added to a Set.
  2. Whenever the AsyncIterator returned is deinited the entry is removed
    (detected by a sentinel class being deinited by ARC since AsyncIterator cannot conform to ~Copyable)
  3. Whenever the Task the subscription is in is cancelled, the entry is removed (the next time the AsyncSequence yields it will return nil, so it is now safe to reuse despite still being referenced

In my limited testing so far, it seems to work. But I'm wondering if there are any potential bugs in this approach?

import Synchronization

public struct CheckedAsyncStream<Element>: AsyncSequence {
    
    // TODO: duplicate AsyncStream inits
    
    public init(
        backing: AsyncStream<Element>
    ) {
        self.backing = backing
    }
    
    private let backing: AsyncStream<Element>
    private let tracker: Tracker = .init()
    
    fileprivate final class Tracker: Sendable {
        let users: Mutex<Set<ObjectIdentifier>> = .init([])
    }
        
    public func makeAsyncIterator() -> AsyncIterator {
        let count = tracker.users.withLock { set in
            set.count
        }
        if count > 0 {
            assertionFailure("CheckedAsyncStream is being shared!")
        }
        return AsyncIterator(
            backing: backing.makeAsyncIterator(),
            tracker: tracker
        )
    }
    
    public struct AsyncIterator: AsyncIteratorProtocol {
        
        fileprivate init(
            backing: AsyncStream<Element>.Iterator,
            tracker: Tracker
        ) {
            self.backing = backing
            self.tracker = .init(tracker: tracker)
        }
        
        fileprivate final class DeallocationTracker: Sendable {
            
            init(
                tracker: Tracker
            ) {
                self.tracker = tracker
                tracker.users.withLock { set in
                    _ = set.insert(ObjectIdentifier(self))
                }
            }
            
            let tracker: Tracker
            
            func remove() {
                tracker.users.withLock { set in
                    _ = set.remove(ObjectIdentifier(self))
                }
            }
            
            deinit {
                // When this object is deinited all copies of the struct are gone
                // and it's safe to reuse.
                remove()
            }
            
        }
        
        fileprivate let tracker: DeallocationTracker
        private var backing: AsyncStream<Element>.Iterator
        
        public mutating func next() async -> Element? {
            await withTaskCancellationHandler {
                await backing.next()
            } onCancel: { [tracker] in
                // If the task is cancelled the backing stream will emit nil when it resumes,
                // so it's safe to reuse even though it is still referenced by a different subscriber.
                tracker.remove()
            }
        }
        
        public mutating func next(
            isolation actor: isolated (any Actor)?
        ) async throws(Never) -> Element? {
            await withTaskCancellationHandler {
                await backing.next()
            } onCancel: { [tracker] in
                // If the task is cancelled the backing stream will emit nil when it resumes,
                // so it's safe to reuse even though it is still referenced by a different subscriber.
               // technically I guess you could share the iterator to a different Task and start  
               // listening again, but that's probably a bad practice and I think it's fine if it is no 
               // longer supported 
                tracker.remove()
            }
        }
        
    }
    
}

extension CheckedAsyncStream: Sendable, SendableMetatype where Element: Sendable {}

it seems that there may be a potential false-negative with the sample code implementation (not sure if you care about those), specifically with this bit:

public func makeAsyncIterator() -> AsyncIterator {
        let count = tracker.users.withLock { set in
            set.count
        }
        if count > 0 {
            assertionFailure("CheckedAsyncStream is being shared!")
        }

if two calls to makeAsyncIterator() occur concurrently, they could both read set.count as zero before inserting new values to track extant iterators, and then neither will hit the assertion, when presumably one of them should.


i have more thoughts on this which i don't have time to write down now, but one thing i wanted to mention is that the throwing variant of AsyncStream does implement runtime checks for concurrent use. IIRC it may have slightly different semantics than what you propose b/c it doesn't track iterator creation counts, but whether there are multiple suspended 'subscribers' when delivering an element.

2 Likes

if two calls to makeAsyncIterator() occur concurrently, they could both read set.count as zero before inserting new values to track extant iterators

Ah, yeah. I should have merged the read and write into one access to the mutex. Thanks for taking the time to give this a read.

but whether there are multiple suspended 'subscribers' when delivering an element.

This sounds like a much easier way to do this. I like that it only asserts when the consequences actually occur, though the one advantage my approach has is that it blows up without any delay; I think the existing version would potentially suspend first, especially with the older version of next(). But maybe that is not an issue in practice.

I think the obvious followup question is "why doesn't regular AsyncStream do this?

1 Like

The history of that was to do with dealing with some regressions around crashes of some applications that were caused by concurrent iteration; the consuming behavior was relaxed such that instead of creating crashes it would just fall into undefined (or more correctly - weakly defined) behavior. I agree it was perhaps a misstep that the throwing variant still fatal errors on concurrent consumption. The major reasoning to relax it was that at the point in time it was found we had practically no tools to be able to reliably diagnose that it was being concurrently iterated.

Retrospectively looking at it, AsyncSequence could have been made as ~Copyable but alas that wasn't available at the creation of those types. I have done some research on what would it take to allow that but we still are missing tools to do that.

The major reasoning to relax it was that at the point in time it was found we had practically no tools to be able to reliably diagnose that it was being concurrently iterated.

Does that include the current approach in AsyncThrowingStream? (I'm writing the rest of this post assuming that that approach is considered good enough)

I agree it was perhaps a misstep that the throwing variant still fatal errors on concurrent consumption

Assuming that y'all are satisfied with the current impl in AsyncThrowingStream: personally, I think the best behavior for the codebase I'm currently working in would be for both of them to detect multiple awaiters the same way, and then call some dynamically dispatched function when the multiple subscriber behavior was detected, which by default would just print that this had occurred (maybe with the name of the Tasks that instigated it if that information was available).

Then codebases like mine, which care a great deal about catching this sort of behavior in debug builds could do something to replace the default function with assertionFailure() or the like, maybe using @_dynamicReplacementFor or just regular dlsym chicanery. (happy to provide a code snippet if the prose alone isn't enough to convey what I'm describing)

I can't really speak to other types of Swift codebases, but this seems like it would desirable in many iOS apps, at least during development. Hopefully the potential performance impact isn't big enough to outweigh the benefits, though if it's already on in AsyncThrowingStream I assume it's already considered okay.

Does this seem like a reasonable change to make? If so, can I just put up a pull request or does this need a full SE or some other process?

Retrospectively looking at it, AsyncSequence could have been made as ~Copyable but alas that wasn't available at the creation of those types. I have done some research on what would it take to allow that but we still are missing tools to do that.

I guess that means I couldn't do this myself in a wrapper either. Just curious, what are the missing tools?

So the behavior was relaxed and likely could only be relaxed (not more strongly enforced) as any sort of alteration due to it creating a distinct crasher that apps in the wild would face – you wouldn't want your customers to mysteriously just start having their apps they paid for start crashing for no change on your part.

The problem is that the assertion only gave a failure in some cases of the misbehavior, other cases would skate past the assertion and never outright fail because even though the iterations may be concurrent they weren't in direct contention (which is what that assertion was guarding).

The missing tools are the same reasons why we don't have a non-copyable sequence type.

So the behavior was relaxed and likely could only be relaxed (not more strongly enforced) as any sort of alteration due to it creating a distinct crasher that apps in the wild would face – you wouldn't want your customers to mysteriously just start having their apps they paid for start crashing for no change on your part.

Agreed, but I don't think what I want to do leads to that outcome. What I'm suggesting is that both AsyncStream and AsyncThrowingStream include code that detects this case, but instead of crashing that they log instead.

The logging could be considered stronger enforcement in the case of AsyncStream I suppose, but it's well short of a crash. The main consequence to users could be a lot of logs (this could also be configurable in which case they'd only pay the performance cost of checking if the check should be performed/whether to log).

Sorry if that wasn't communicated effectively in my last message.

even though the iterations may be concurrent they weren't in direct contention

Imo something is better than nothing here, as long as there's no false positives. But I can definitely see why you wouldn't want to invest in this further when it's not 100% reliable.

I think if we had some way of logging at that level that was appropriate I would be willing to advocate that change of relaxing the throwing version to be congruent to the non throwing version.

(perhaps even if we had a way of doing a runtime issue that would be neat too)

1 Like

In my opinion, this has become more relevant now and is worth further exploring. With the introduction of the share operator in AsyncAlgorithms, where one extra or less line of code can make a huge difference, perhaps we could generalize this 'warning' for AsyncSequence as a whole?

P.S. A somewhat related and interesting post about the difference between AsyncStream and AsyncThrowingStream: 'Consuming an AsyncStream from multiple Tasks' Redux

1 Like

I've did a prototype here. It's passing all the unit tests in async_stream.swift, but I haven't tested it beyond that.

I brought the array of continuations from AsyncStream into AsyncThrowingStream, and replaced fatalError with a function that calls logFailedCheck (the same function used by CheckedContinuation to alert on leaks).

The new implementation has a quirk where if AsyncThrowingSequence throws, only the first consumer to await will receive the error, and all others will simply finish. I don't think this is a real problem in practice, since it only arises when consumers are already misusing the API.

To enable replacement implementations to add logging or IDE runtime issue markers, I'm currently using a normal Swift function marked @inline(never) as a wrapper around the logging, but it seems like maybe the pattern in ConcurrencyHooks.cpp is more appropriate for this?

1 Like

Just wanted to circle back on this: did this seem like a reasonable change? Should I just do a normal pull request, or is there something else I should be doing to move this forward?

The forums seem to not be showing a link for that. Can you just tag me on your PR as a reviewer?