Unit-Testing and Confirming a Class Updates State with AsyncStream and Task

Hi! I'm experimenting with a type that listens to values from an AsyncStream instance and then updates some state when those values are received. I'm having some challenges writing a unit test to confirm my Listener is doing the right thing. This is a question mostly about best practices for trying to test async code (I do believe I want my Listener to make use of AsyncStream… but I believe that it is not AsyncStream itself is directly impacting my ability to test this type).

Here is an example of my production class:

final public class Listener<Element> {
  public var count = 0
  private var task: Task<Void, Never>?
  
  deinit {
    self.task?.cancel()
  }
  
  public func listen(to stream: AsyncStream<Element>) {
    self.count = 0
    self.task?.cancel()
    self.task = Task { [weak self] in
      for await _ in stream {
        guard Task.isCancelled == false else { return }
        self?.count += 1
      }
    }
  }
}

It's pretty simple… but it demonstrates what my code is trying to do. We have an API that receives an AsyncStream instance and begins an async iteration over the stream values. Every stream value received just increments a counter value. I also would like for this Listener to cancel the stream when this class instance is no longer being referenced (AFAIK this is not out-of-the-box free with an arbitrary Task instance).

My two goals with unit tests would be to confirm:

  • Passing a new value to my stream continuation increments my count.
  • Releasing my reference on a listener instance cancels my stream continuation.

As you can imagine… the Task dependency means it's not completely easy to test my count is incrementing correctly.[1] One workaround is to put my test to sleep:

@Suite actor ListenerTests {
  @Test func listen() async throws {
    let (stream, continuation) = AsyncStream.makeStream(of: Int.self)
    nonisolated(unsafe) var didCancel = false
    continuation.onTermination = { termination in
      if case .cancelled = termination {
        didCancel = true
      }
    }
    
    do {
      let listener = Listener<Int>()
      listener.listen(to: stream)
      
      continuation.yield(1)
      continuation.yield(2)
      continuation.finish()
      try await Task.sleep(nanoseconds: 1_000_000)
      #expect(listener.count == 2)
    }
    
    #expect(didCancel == false)
  }
  
  @Test func listenAndCancel() async throws {
    let (stream, continuation) = AsyncStream.makeStream(of: Int.self)
    nonisolated(unsafe) var didCancel = false
    continuation.onTermination = { termination in
      if case .cancelled = termination {
        didCancel = true
      }
    }
    
    do {
      let listener = Listener<Int>()
      listener.listen(to: stream)
      
      continuation.yield(1)
      try await Task.sleep(nanoseconds: 1_000_000)
      #expect(listener.count == 1)
    }
    
    #expect(didCancel == true)
  }
}

Both of these tests "pass"… which is good. As a last-resort… I would be ok with sleeping. I would also like to keep thinking here about this to see if any new (or "modern") patterns have emerged here for this situation. I'm also hoping to do this all in swift-testing (which means I'm not planning to make use of a XCTWaiter to block or suspend).

I have an attempt at a new version of Listener that accepts a TaskGroup:

final public class ListenerTwo<Element> {
  public var count = 0
  private var task: Task<Void, Never>?
  
  deinit {
    self.task?.cancel()
  }
  
  public func listen(to stream: AsyncStream<Element>) {
    self.task?.cancel()
    self.task = Task { [weak self] in
      await withTaskGroup(of: Void.self) { group in
        self?.listen(
          to: stream,
          with: &group
        )
      }
    }
  }
  
  package func listen(
    to stream: AsyncStream<Element>,
    with group: inout TaskGroup<Void>
  ) {
    self.count = 0
    group.addTask { [weak self] in
      for await _ in stream {
        guard Task.isCancelled == false else { return }
        self?.count += 1
      }
    }
  }
}

For the most part… this looks like it would give my unit test a more legit path to test the behavior (by passing an arbitrary TaskGroup). My public API still spawns its own Task and wraps the package API. I think this might be the right direction… but it's also not great because (AFAIK) I have no ability to "save" the child task that is being added to a TaskGroup.[2] This seems to imply that I can use this package API to validate my counter is incrementing without a sleep… but I don't see any ability for this package API to cancel my stream continuation when my Listener is released. I also see no ability to call this package API more than once and confirm the "previous" stream is no longer performing any work (because I lost the ability to cancel the last continuation).

Here are some tests to demonstrate:

@Suite actor ListenerTwoTests {
  @Test func listen() async throws {
    let (stream, continuation) = AsyncStream.makeStream(of: Int.self)
    nonisolated(unsafe) var didCancel = false
    continuation.onTermination = { termination in
      if case .cancelled = termination {
        didCancel = true
      }
    }
    
    do {
      let listener = ListenerTwo<Int>()
      listener.listen(to: stream)
      
      continuation.yield(1)
      continuation.yield(2)
      continuation.finish()
      try await Task.sleep(nanoseconds: 1_000_000)
      #expect(listener.count == 2)
    }
    
    #expect(didCancel == false)
  }
  
  @Test func listenAndCancel() async throws {
    let (stream, continuation) = AsyncStream.makeStream(of: Int.self)
    nonisolated(unsafe) var didCancel = false
    continuation.onTermination = { termination in
      if case .cancelled = termination {
        didCancel = true
      }
    }
    
    do {
      let listener = ListenerTwo<Int>()
      listener.listen(to: stream)
      
      continuation.yield(1)
      try await Task.sleep(nanoseconds: 1_000_000)
      #expect(listener.count == 1)
    }
    
    #expect(didCancel == true)
  }
  
  @Test func listenWithGroup() async throws {
    let (stream, continuation) = AsyncStream.makeStream(of: Int.self)
    nonisolated(unsafe) var didCancel = false
    continuation.onTermination = { termination in
      if case .cancelled = termination {
        didCancel = true
      }
    }
    
    do {
      let listener = ListenerTwo<Int>()
      await withTaskGroup(of: Void.self) { group in
        listener.listen(
          to: stream,
          with: &group
        )
        
        continuation.yield(1)
        continuation.yield(2)
        continuation.finish()
        
        await group.waitForAll()
      }
      
      #expect(listener.count == 2)
    }
    
    #expect(didCancel == false)
  }
}

These tests pass… but I would really like some way for listener.listen(to:with:) to also offer the ability to cancel its child task somehow.

Any ideas or patterns here that might approach this in a different way? Are there any additional concurrency APIs I might not be familiar with that could unlock the ability to test this in a better way?

My tests are building from Swift 6… but my goal is for my production types to build from 5.10 and up. I'm currently deploying back to .macOS(.v10_15)… but I would consider moving that up if there was some more modern API that I would want. Thanks!


  1. How to test a method inside Task Async/await in swift ↩︎

  2. TaskGroup: Keeping a Reference to Child Tasks? ↩︎

Please watch Go further with Swift Testing. :smile: nonisolated(unsafe) state is specifically something we call out as an anti-pattern. Consider using a confirmation for these tests.

And please be aware Swift Testing requires Swift 6.

2 Likes

Ahh… yes. Here's a new one:

@Test func listenAndCancel() async throws {
  try await confirmation(expectedCount: 1) { didCancel in
    let (stream, continuation) = AsyncStream.makeStream(of: Int.self)
    continuation.onTermination = { termination in
      if case .cancelled = termination {
        didCancel()
      }
    }
    
    do {
      let listener = ListenerTwo<Int>()
      listener.listen(to: stream)
      
      continuation.yield(1)
      try await Task.sleep(nanoseconds: 1_000_000)
      #expect(listener.count == 1)
    }
  }
}