Hi! I'm experimenting with a type that listens to values from an AsyncStream
instance and then updates some state when those values are received. I'm having some challenges writing a unit test to confirm my Listener
is doing the right thing. This is a question mostly about best practices for trying to test async code (I do believe I want my Listener
to make use of AsyncStream
… but I believe that it is not AsyncStream
itself is directly impacting my ability to test this type).
Here is an example of my production class:
final public class Listener<Element> {
public var count = 0
private var task: Task<Void, Never>?
deinit {
self.task?.cancel()
}
public func listen(to stream: AsyncStream<Element>) {
self.count = 0
self.task?.cancel()
self.task = Task { [weak self] in
for await _ in stream {
guard Task.isCancelled == false else { return }
self?.count += 1
}
}
}
}
It's pretty simple… but it demonstrates what my code is trying to do. We have an API that receives an AsyncStream
instance and begins an async
iteration over the stream values. Every stream value received just increments a counter value. I also would like for this Listener
to cancel the stream when this class instance is no longer being referenced (AFAIK this is not out-of-the-box free with an arbitrary Task
instance).
My two goals with unit tests would be to confirm:
- Passing a new value to my stream continuation increments my count.
- Releasing my reference on a listener instance cancels my stream continuation.
As you can imagine… the Task
dependency means it's not completely easy to test my count is incrementing correctly.[1] One workaround is to put my test to sleep:
@Suite actor ListenerTests {
@Test func listen() async throws {
let (stream, continuation) = AsyncStream.makeStream(of: Int.self)
nonisolated(unsafe) var didCancel = false
continuation.onTermination = { termination in
if case .cancelled = termination {
didCancel = true
}
}
do {
let listener = Listener<Int>()
listener.listen(to: stream)
continuation.yield(1)
continuation.yield(2)
continuation.finish()
try await Task.sleep(nanoseconds: 1_000_000)
#expect(listener.count == 2)
}
#expect(didCancel == false)
}
@Test func listenAndCancel() async throws {
let (stream, continuation) = AsyncStream.makeStream(of: Int.self)
nonisolated(unsafe) var didCancel = false
continuation.onTermination = { termination in
if case .cancelled = termination {
didCancel = true
}
}
do {
let listener = Listener<Int>()
listener.listen(to: stream)
continuation.yield(1)
try await Task.sleep(nanoseconds: 1_000_000)
#expect(listener.count == 1)
}
#expect(didCancel == true)
}
}
Both of these tests "pass"… which is good. As a last-resort… I would be ok with sleeping. I would also like to keep thinking here about this to see if any new (or "modern") patterns have emerged here for this situation. I'm also hoping to do this all in swift-testing
(which means I'm not planning to make use of a XCTWaiter
to block or suspend).
I have an attempt at a new version of Listener
that accepts a TaskGroup
:
final public class ListenerTwo<Element> {
public var count = 0
private var task: Task<Void, Never>?
deinit {
self.task?.cancel()
}
public func listen(to stream: AsyncStream<Element>) {
self.task?.cancel()
self.task = Task { [weak self] in
await withTaskGroup(of: Void.self) { group in
self?.listen(
to: stream,
with: &group
)
}
}
}
package func listen(
to stream: AsyncStream<Element>,
with group: inout TaskGroup<Void>
) {
self.count = 0
group.addTask { [weak self] in
for await _ in stream {
guard Task.isCancelled == false else { return }
self?.count += 1
}
}
}
}
For the most part… this looks like it would give my unit test a more legit path to test the behavior (by passing an arbitrary TaskGroup
). My public
API still spawns its own Task
and wraps the package
API. I think this might be the right direction… but it's also not great because (AFAIK) I have no ability to "save" the child task that is being added to a TaskGroup
.[2] This seems to imply that I can use this package
API to validate my counter is incrementing without a sleep… but I don't see any ability for this package
API to cancel my stream continuation when my Listener
is released. I also see no ability to call this package
API more than once and confirm the "previous" stream is no longer performing any work (because I lost the ability to cancel the last continuation).
Here are some tests to demonstrate:
@Suite actor ListenerTwoTests {
@Test func listen() async throws {
let (stream, continuation) = AsyncStream.makeStream(of: Int.self)
nonisolated(unsafe) var didCancel = false
continuation.onTermination = { termination in
if case .cancelled = termination {
didCancel = true
}
}
do {
let listener = ListenerTwo<Int>()
listener.listen(to: stream)
continuation.yield(1)
continuation.yield(2)
continuation.finish()
try await Task.sleep(nanoseconds: 1_000_000)
#expect(listener.count == 2)
}
#expect(didCancel == false)
}
@Test func listenAndCancel() async throws {
let (stream, continuation) = AsyncStream.makeStream(of: Int.self)
nonisolated(unsafe) var didCancel = false
continuation.onTermination = { termination in
if case .cancelled = termination {
didCancel = true
}
}
do {
let listener = ListenerTwo<Int>()
listener.listen(to: stream)
continuation.yield(1)
try await Task.sleep(nanoseconds: 1_000_000)
#expect(listener.count == 1)
}
#expect(didCancel == true)
}
@Test func listenWithGroup() async throws {
let (stream, continuation) = AsyncStream.makeStream(of: Int.self)
nonisolated(unsafe) var didCancel = false
continuation.onTermination = { termination in
if case .cancelled = termination {
didCancel = true
}
}
do {
let listener = ListenerTwo<Int>()
await withTaskGroup(of: Void.self) { group in
listener.listen(
to: stream,
with: &group
)
continuation.yield(1)
continuation.yield(2)
continuation.finish()
await group.waitForAll()
}
#expect(listener.count == 2)
}
#expect(didCancel == false)
}
}
These tests pass… but I would really like some way for listener.listen(to:with:)
to also offer the ability to cancel its child task somehow.
Any ideas or patterns here that might approach this in a different way? Are there any additional concurrency APIs I might not be familiar with that could unlock the ability to test this in a better way?
My tests are building from Swift 6… but my goal is for my production types to build from 5.10 and up. I'm currently deploying back to .macOS(.v10_15)
… but I would consider moving that up if there was some more modern API that I would want. Thanks!