Testing an async subscriber for a Combine publisher

My goal is to implement a version of the `sink` operator from Combine that can run asynchronous code. I’m targeting Swift 5 language mode.

Here’s what I came up with:

extension Publisher where Failure == Never
{
    func asyncSink(name: String? = nil,
                   bufferingPolicy: AsyncStream<Output>.Continuation.BufferingPolicy = .unbounded,
                   receiveValue: @escaping @isolated(any) (Output) async -> Void) -> some Cancellable
    {
        let values = AsyncStream(bufferingPolicy: bufferingPolicy) { continuation in
            let cancellable = sink { _ in
                continuation.finish()
            } receiveValue: { value in
                continuation.yield(value)
            }
            
            continuation.onTermination = { continuation in
                cancellable.cancel()
            }
        }

        return Task(name: name) {
            for await value in values {
                await receiveValue(value)
            }
        }
    }
}

extension Task: @retroactive Cancellable {
}

Now I’m trying to test this wrapper. One of the things I’d like to test is the backpressure behavior (the scenario where the publisher emits values faster than the subscriber can process them).

Here’s how I’m currently doing this:

@MainActor
struct CombineLatestTests {

    @Test
    func `combineLatest subscriber receives all values`() async throws
    {
        let start = Date.now
        let elapsedTime = {
            String(format: "%.3f", Date.now.timeIntervalSince(start))
        }
        
        // Given
        let subject = PassthroughSubject<Int, Never>()
        var receivedValues: [Int] = []

        let subscription = subject
            .asyncSink(bufferingPolicy: .bufferingNewest(1)) { value in
                print("\(elapsedTime()): Received \(value)")
                receivedValues.append(value)
                try? await Task.sleep(for: .milliseconds(10))
            }

        // When
        try await sendValues(1, 2, 3, to: subject, withInterval: .milliseconds(3), elapsedTime: elapsedTime)
        try await Task.sleep(for: .milliseconds(100))
        
        subscription.cancel()
        
        // Then
        print("\(elapsedTime()): Done! \(receivedValues)")

        #expect(receivedValues == [1, 3])
    }
}

@concurrent
func sendValues<Value>(_ values: Value...,
                       to subject: some Subject<Value, Never>,
                       withInterval interval: ContinuousClock.Duration = .milliseconds(3),
                       elapsedTime: () -> String) async throws
{
    for value in values
    {
        print("\(elapsedTime()): Sending \(value)")
        subject.send(value)
        try await Task.sleep(for: interval)
    }
}

This test seems to work:

0.000: Sending 1
0.000: Received 1
0.004: Sending 2
0.008: Sending 3
0.011: Received 3
0.116: Done! [1, 3]

I ran this test repeatedly, 1000 times, there weren’t any failures. However, I had some flakiness in previous iterations , so I’m not sure if something is wrong. In general, I’m still learning Swift Concurrency (especially the latest changes in Swift 6.2), so I can’t say I know what I’m doing.

Does anyone have any feedback?