i would like to write a test that runs some checks only after an AsyncStream has multiple pending consumers awaiting its next value. i've yet to come up with an approach that seems like it isn't exposed to some amount of non-determinism. the aim would be to get something like this:
let (stream, producer) = AsyncStream<Int>.makeStream()
let task1 = Task {
for await value in stream {
...
}
}
let task2 = Task {
for await value in stream {
...
}
}
// do something that ensures both Task's are awaiting the stream's values
// make some test assertions
can anyone think of an approach to achieve this? or alternatively, argue that it cannot be done.
Could you use a spy around your stream that conforms to AsyncSequence and counts how many times each method/property are accessed? Then you can make sure it has at least 2 subscriptions called at that point. You would just need to make sure you safely handle the potential delay of the Tasks running since that can be delayed.
that's a good idea, but i actually want to test the stream's behavior itself when it has multiple suspended consumers, so adding a proxy won't quite be sufficient.
for posterity, this is the gist of what i settled on:
let (stream, producer) = AsyncStream<Int>.makeStream()
let (controlStream, controlProducer) = AsyncStream<Void>.makeStream()
var controlConsumer = controlStream.makeAsyncIterator()
let task1 = Task { @MainActor in
controlProducer.yield()
for await value in stream {
// ...
}
}
await controlConsumer.next()
// task1 is now either running code after the `yield` or awaiting stream elements
await MainActor.run {}
// task1 must now be awaiting stream elements
// repeat for additional consumers
the secondary stream can be replaced with use of withXYZContinuation, but for my use case having a feedback channel was useful (and i'm not sure you can return the Task via the continuation approach).