Combine and Swift Concurrency Interop: Unexpectedly dropping values

Hi there!

I am still learning swift concurrency so I admittedly might be missing some fundamental concepts that are leading to unexpected results.

Anyways, I am aware that other topics here say to tread carefully when mixing combine and swift concurrency as they tend to have different (and sometimes conflicting) opinions on how to deal with async and concurrent code.

However, I think my question still stands. Combine "conveniently" bridges to swift concurrency with the .values property on publisher... so surely it should work for relatively simple situations, right? :man_shrugging:

My situation is this: a single CurrentValueSubject which is "subscribed to" via a for-await-in loop. The subject is concurrently sent new values from many individual Tasks. I would expect the for-await-in loop to receive all values in indeterminate order. However, the for-await-in loop drops values. Interestingly, if I subscribe to the subject with a sink subscriber, I receive all of the values, consistently.

Aside: it seems that somewhere within Combine, a lock of some kind is being used to guarantee that subscribers do not receive values concurrently making them thread-safe from things like memory races (but not logical races). Can folks confirm this?

I think the reason for the distinction between "sinks" and "for-await-in" is because the sink subscriber requests unlimited values while the for-await-in loop requests a .max(1) between each iteration and the concurrent sends overwhelm it, thus dropped values.

I'll paste some sample code below to demonstrate this situation. However, my main question is this: this feels just too easy to get wrong. Or am I missing something very obvious and fundamental here that is clearly a misuse of the the two frameworks? It's situations like these that help me better understand the tools available to us so any explanation or confirmation to my theory above would be greatly appreciated, thanks!

Here's the sample code. Two tests: the first subscribes with a sink subscriber. This consistently passes. The second tests subscribes via for-await-in which consistently fails.

    func testConcurrentSendWithSink() {
        let subject = CurrentValueSubject<Int, Never>(0)
        let subjectWithHandler = subject.handleEvents(
            receiveRequest: { request in
                print("received request: \(request)")
            }
        )

        var receivedValues = Set<Int>()
        let exp = expectation(description: "")
        subjectWithHandler
            .sink(
            receiveCompletion: { _ in
                print("received completion")
                exp.fulfill()
            },
            receiveValue: {
                print("sink subscriber received: \($0) on ", { Thread.current }())
                receivedValues.insert($0)
            }
        )
        .store(in: &subscriptions)

        Task {
            await withTaskGroup(of: Void.self) { group in
                (1...10).forEach { i in
                    group.addTask {
                        print("sending \(i) from ", { Thread.current }())
                        subject.send(i)
                    }
                }

                await group.waitForAll()
                subject.send(completion: .finished)
            }
        }

        waitForExpectations(timeout: 3)

        XCTAssertEqual(receivedValues, Set(0...10))
    }

    func testConcurrentSendWithAsyncSequence() async {
        let subject = CurrentValueSubject<Int, Never>(0)
        let subjectWithHandler = subject.handleEvents(
            receiveRequest: { request in
                print("received request: \(request)")
            }
        )


        let receiverTask = Task {
            var receivedValues = Set<Int>()
            for await value in subjectWithHandler.values {
                print("async sequence received: \(value) on ", { Thread.current }())
                receivedValues.insert(value)
            }
            return receivedValues
        }

        Task {
            // small sleep to let the above task await values before we start sending
            try await Task.sleep(for: .seconds(0.1))
            await withTaskGroup(of: Void.self) { group in
                (1...10).forEach { i in
                    group.addTask {
                        print("sending \(i) from ", { Thread.current }())
                        subject.send(i)
                    }
                }

                await group.waitForAll()
                subject.send(completion: .finished)
            }
        }

        let receivedValues = await receiverTask.value
        XCTAssertEqual(receivedValues, Set(0...10))
    }

It is often useful to add a .buffer in the mix when using .values if demand is expected to be non-1-by-1.

1 Like

Yep, that fixes the broken test. Thanks for the suggestion there.

Adding a .buffer was not intuitive to me but maybe it should have been. Is it a bit odd that the onus is on the subscriber to know if values may be produced asynchronously or concurrently and thus, will require a buffer?

After a bit more playing around, the dropped values aren't actually due to concurrently producing values. If I annotate the sending and receiving tasks with @MainActor, the for-await-in still drop values. Is the real problem here just the non-deterministic scheduling of swift concurrency coupled with for-await-in only requests 1 value at a time? In the second test code above, a batch of the child "producer" tasks run back to back before the subscriber task is given a chance to run and receive the produced values (hence the dropping).

Thanks again for the .buffer tip! That certainly helps me understand things better.