Is it safe to add @Published to a variable that is mutated from a private queue?

For example:

class SomeObject {
    var values: AnyPublisher<[Int], Never> {
        $_values.eraseToAnyPublisher()
    }
    @Published private var _values: [Int] = []
    private let queue = DispatchQueue(label: "queue")

    func append(_ value: Int) {
        queue.async {
            self._values.append(value)
        }
    }
}

Is the values publisher violating any rules? I ask because the following test is sometimes failing for me (I'm not receiving all ten appended values within five seconds):

func testPublished() {

    let obj = SomeObject()
    for i in (0..<10) {
        obj.append(i)
    }

    let allValuesReceived = XCTestExpectation(description: "all values received")
    let valuesQueue = DispatchQueue(label: "valuesQueue")
    var lastValuesReceived: [Int]?
    let cancellable = obj.values
        .sink { values in
            valuesQueue.sync {
                lastValuesReceived = values
                if values.count == 10 {
                    allValuesReceived.fulfill()
                }
            }
        }

    wait(for: [allValuesReceived], timeout: 5)

    valuesQueue.sync {
        XCTAssertEqual(lastValuesReceived!.sorted(), Array(0..<10))
    }

    cancellable.cancel()
}
1 Like

Part of the issue here is that the subscription to the values is setup after it is starting to async things to assigning. If you did it in the reverse order; setting up the observation of values before asyncing things it should be safe since @Published technically has exclusivity guarantees. It is worth noting however that there is no order guarantee by it when sourced from a concurrent queue.

Mind providing more detail about this? Moving the subscription above the append loop does fix things (thank you for that), but I'm not sure that I understand why. I could imagine the first few events being lost while the subscriber and subscription set up their connection but, if what I've written is thread-safe, shouldn't the subscriber eventually receive the full array?

I've simplified my question a little bit - turns out that I can reproduce the failure with a serial queue.

@Published only stores the last value received for new subscribers. It does not buffer them. So any values sent before the subscription has started aside from the last one will be lost.

But the last value received should always be [0, 1, 2, 3, 4, 5, 6, 7, 8, 9], right? I'm seeing tests finish with lastValuesReceived set to things like [0, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1] and [1, 2, 3, 4, 5, 6, 7, 8, 9].

You're right, your test is unreliable. I'm not sure quite why, but some mix of the extra queues you're using and publishing the values before subscribing might be the issue. Restructuring the test makes it completely reliable without changing your publisher setup.

func testThatSomeObjectPublishesValuesCorrectlyWhenMutatedAfterSubscription() {
    // Given
    let obj = SomeObject()
    let values = 0..<10
    let allValuesReceived = expectation(description: "all values received")
    allValuesReceived.expectedFulfillmentCount = values.count + 1
    var lastValuesReceived: [Int]?
    
    // When
    let cancellable = obj.values
        .receive(on: DispatchQueue.main)
        .sink { values in
            lastValuesReceived = values
            allValuesReceived.fulfill()
        }
    
    for i in (0..<10) {
        obj.append(i)
    }
    
    wait(for: [allValuesReceived], timeout: 0.5)
    
    // Then
    XCTAssertEqual(lastValuesReceived, Array(0..<10))
    
    cancellable.cancel()
}

Once you have a working test you can then iterate on difference behaviors until things break.

I appreciate the feedback. I can get your version of the test to fail by moving the append loop back above the subscription.

But, my goal isn't to fix the test. SomeObject is a stripped-down version of an object that I've been using in a production application. It seems reasonable to assume that client code might subscribe to the object's publisher while/after values are being appended, so enforcing the order of subscriptions and mutations isn't a realistic solution.

You can replicate that as well. Like I said, it's usually a good idea to start with a working test and iterate until something breaks. In this case you can asynchronously append items before subscribing, you just need to structure it well.

func testThatSomeObjectPublishesValuesCorrectlyWhenMutatedBeforeSubscription() {
    // Given
    let obj = SomeObject()
    let values = 0..<10
    let allValuesSent = expectation(description: "all values sent")
    let allValuesReceived = expectation(description: "all values received")
    var lastValuesReceived: [Int]?
    
    // When
    let queue = DispatchQueue(label: "otherQueue")
    for i in values {
        queue.async {
            obj.append(i)
            if i == 9 { allValuesSent.fulfill() }
        }
    }

    let cancellable = obj.values
        .receive(on: DispatchQueue.main)
        .sink { values in
            lastValuesReceived = values
            if values.count == 10 { allValuesReceived.fulfill() }
        }
    waitForExpectations(timeout: 0.5)
    
    XCTAssertEqual(lastValuesReceived, Array(values))
    
    cancellable.cancel()
}

It's also important in async tests to have expectations for all async events, not just those you actually care about. In this case we can ensure we both send and receive events correct. It may not be important now, but the more queues and async actions you add, the more important it is to ensure things complete the way you think they do.

Isn't that new test just moving the append loop back below the subscription by adding it to an async queue?

No, as the separate queue operates in parallel to the main queue the test is actually executing on. It's entirely possible that all of the values are published first or they're all published after, or a mix. Of course, you can try to control the before and after by setting up expectations between the steps, but I'm not sure what the value would be. At some point, exerting control over the async actions in a test actually eliminates the async testing you're trying to do. Attempting to test async code synchronously is a bad idea for the same reason.

I understand, but realistically your async loop is going to run after let cancellable = obj.values... 99 times out of 100. At least that's what I'm seeing. I'm also seeing that test fail about 1 time out of 200.

My fault, sorry. I copied your test wrong - I'd put the loop inside a single async block. After fixing, I am seeing some events sent before the subscription starts. But I'm also still seeing occasional failures.

You're right, I see about a 2% failure rate. Increasing the number of values submitted asynchronously increases that rate. Submitting 100 values gives me a 66% failure rate. Strangely, some of them are missing values within SomeObject's _values. One test my last value was every number from 0 to 99 but missing 46, which seems bizarre. Interestingly, if I make SomeObject's internal queue the main queue, the test is fully reliable. I really don't know what's going on here, but this would probably make for a good bug report or DTS incident.

Interesting, I hadn't tried changing the number of values. Ok thanks for testing it out and verifying, it's nice to think I might not be going crazy. I'll submit a bug report.

Terms of Service

Privacy Policy

Cookie Policy