My goal is to implement a version of the `sink` operator from Combine that can run asynchronous code. I’m targeting Swift 5 language mode.
Here’s what I came up with:
extension Publisher where Failure == Never
{
func asyncSink(name: String? = nil,
bufferingPolicy: AsyncStream<Output>.Continuation.BufferingPolicy = .unbounded,
receiveValue: @escaping @isolated(any) (Output) async -> Void) -> some Cancellable
{
let values = AsyncStream(bufferingPolicy: bufferingPolicy) { continuation in
let cancellable = sink { _ in
continuation.finish()
} receiveValue: { value in
continuation.yield(value)
}
continuation.onTermination = { continuation in
cancellable.cancel()
}
}
return Task(name: name) {
for await value in values {
await receiveValue(value)
}
}
}
}
extension Task: @retroactive Cancellable {
}
Now I’m trying to test this wrapper. One of the things I’d like to test is the backpressure behavior (the scenario where the publisher emits values faster than the subscriber can process them).
Here’s how I’m currently doing this:
@MainActor
struct CombineLatestTests {
@Test
func `combineLatest subscriber receives all values`() async throws
{
let start = Date.now
let elapsedTime = {
String(format: "%.3f", Date.now.timeIntervalSince(start))
}
// Given
let subject = PassthroughSubject<Int, Never>()
var receivedValues: [Int] = []
let subscription = subject
.asyncSink(bufferingPolicy: .bufferingNewest(1)) { value in
print("\(elapsedTime()): Received \(value)")
receivedValues.append(value)
try? await Task.sleep(for: .milliseconds(10))
}
// When
try await sendValues(1, 2, 3, to: subject, withInterval: .milliseconds(3), elapsedTime: elapsedTime)
try await Task.sleep(for: .milliseconds(100))
subscription.cancel()
// Then
print("\(elapsedTime()): Done! \(receivedValues)")
#expect(receivedValues == [1, 3])
}
}
@concurrent
func sendValues<Value>(_ values: Value...,
to subject: some Subject<Value, Never>,
withInterval interval: ContinuousClock.Duration = .milliseconds(3),
elapsedTime: () -> String) async throws
{
for value in values
{
print("\(elapsedTime()): Sending \(value)")
subject.send(value)
try await Task.sleep(for: interval)
}
}
This test seems to work:
0.000: Sending 1
0.000: Received 1
0.004: Sending 2
0.008: Sending 3
0.011: Received 3
0.116: Done! [1, 3]
I ran this test repeatedly, 1000 times, there weren’t any failures. However, I had some flakiness in previous iterations , so I’m not sure if something is wrong. In general, I’m still learning Swift Concurrency (especially the latest changes in Swift 6.2), so I can’t say I know what I’m doing.
Does anyone have any feedback?