I'm trying to understand the behavior I observe when running this test (see GitHub repo for Xcode project):
import XCTest
import Combine
class Item {
deinit {
// Randomly called by Future.deinit
// How does this instance's retain count go to zero?
// Shouldn't the Store retain this, preventing it from being freed?
// The Store itself is never deinitialized
print("item deinit")
}
}
class Store {
static let shared = Store()
private let item: Item = Item()
private let queue = DispatchQueue(label: "Store.queue")
deinit {
// Let's rule out the Store instance being deinitialized
fatalError("store deinit")
}
func fetchItem(completion: @escaping (Item) -> Void) {
queue.async {
completion(self.item)
}
}
}
final class BadAccessPlaygroundTests: XCTestCase {
func testBadAccess() {
let expectation = expectation(description: "expectation")
var task: AnyCancellable?
task = Future<Item, Never> { promise in
Store.shared.fetchItem() { result in
promise(.success(result))
}
}
.sink(
receiveCompletion: { _ in
expectation.fulfill()
task?.cancel()
}, receiveValue: { _ in }
)
wait(for: [expectation])
}
}
With Xcode 15 (15A240d) on an M2 Mac Studio and an iPhone 15 Pro, I am running the test with 10000 iterations until failure (running it only once or a few times is highly unlikely to trigger the described behavior). I expect the test to never fail or crash. However:
Item is often deinitialized in a random test iteration:
Later, it may run into an EXC_BAD_ACCESS: see screenshot
I thought perhaps it was due to something environmental with XCTest, but I have tried porting the code to a similar implementation in the app itself and was able to hit an EXC_BAD_ACCESS eventually.
I'd love to know if I'm misunderstanding something, doing something unsafe or undefined.
hi! so i think there's at least one data race here on the mutable task variable that's captured within the receiveCompletion closure. if you run this code with the thread sanitizer enabled (and perform sufficiently many repetitions), it should catch it and explain how there is a write to task happening from the main thread, and a read from it occurring on a background thread (when the receive completion is invoked). i think the behavior is nondeterministic since there's an inherent race between the fetchItem() call completing and sink() returning.
to address the race on task, i tried moving the cancelation call to after the expectation is awaited, that way all access occurs on the main thread. this seemed to fix the sanitizer complaint (though i'm not sure if that's the appropriate behavior you're looking for), but curiously, the memory access errors would still usually occur when the test was run with many repetitions. the crashes all suggested something surrounding reference counts/deallocation of the involved types. and FWIW, i never saw Item.deinit() called, though a bug affecting reference counts seems like it could cause that behavior as a side effect. perhaps trying to find a reproduction that doesn't use Combine (or not being able to produce such an example) could further point in the direction of the underlying issue?
@jamieQ Thank you for the response and the feedback!
I'm explored this further and have been able to reproduce the behavior with both Combine and OpenCombine (see GitHub project). I also took your task suggestion into account. I also discovered that I can simplify the test case further by removing the usage of Store entirely.
I'm still puzzled as to why this is happening, and hoping for some sort of workaround or explanation. It's probably time to file a Feedback, I suppose.
@IbrahimalKhayat I just tried it (I had not come across that thread before), and the proposed workaround with Publishers.ReplayOnce did seem to not exhibit the issue.
However, I tried to use .values, and it seemed to deliver inconsistent results. Sometimes the AsyncPublisher would never yield a value. Not sure why.
Do you have any deeper insight into this issue? Do you know if there's something fundamentally wrong with my usage of Future/Combine, or is there a bug with Combine itself? Appreciate your response!
I believe it is a bug in Combine handling of canceled streams. There seems to be a race condition between canceling a stream and receiving an event. In some rare cases a subscriber receives an event on a stream that was canceled, hence the EXC_BAD_ACCESS, the framework should handle such cancellations gracefully.
As for the value it is expected that some values will be canceled. Fixing the race condition means that some subscriptions will be canceled and never receive an event. If this is not what you expect your code to do then please make sure you are retaining the AnyCancellable reference correctly since they will deinit if they don't have a strong reference and that could happen before receiving the value.
I noticed that you are using a class type Item. You need to be very careful not to mutate the class as it may cause other inconsistency crashes. I personally avoid using classes with streams and multithreading as they are pass by reference types and may mutate if used incorrectly. Consider using a struct or conforming your class to sendable
I used the code below to reproduce the crash. I used an actor as a Store
import XCTest
import Combine
class TestClass {
struct Item {}
actor Store {
func get() async throws -> [String: Item] {
await Task {
return ["Item": Item()]
}.value
}
}
let store = Store()
func getValue() -> AnyPublisher<[String: Item], Error> {
return Deferred {
Future { promise in
Task {
do {
let value = try await self.store.get()
promise(.success(value))
} catch {
promise(.failure(error))
}
}
}
}
.eraseToAnyPublisher()
}
}
final class FutureTests: XCTestCase {
let sut = TestClass()
func testBadAccess() {
let task = Task {
let cancallable = sut.getValue()
.map {
$0.filter {
$0.key == "item"
}
}
.sink(
receiveCompletion: { _ in
}, receiveValue: {
print($0["item"])
}
)
}
Task {
task.cancel()
}
}
}
I began investigating this issue because I receive a fair amount of crash reports for my app that closely resemble this one. Memory and thread sanitizers didn't help. This was my minimal reproducible test case.
To elaborate on the issue I was describing in my previous message, I was talking about the proposed ReplayOnce implementation. Here's an actual test case.
func testBadAccessReplayOnceAsync() async {
let items = Deferred {
Publishers.ReplayOnce<Item, Never> { promise in
DispatchQueue.global().async {
promise(.success(Item()))
}
}
}
.values
for await i in items {
XCTAssertNotNil(i)
return
}
XCTFail()
}
This seems to fail in the 8th or 9th iteration. It seems to be limited to the ReplaceOnce implementation. It doesn't fail like this with Future (but Future of course has the EXC_BAD_ACCESS issue…)
This, however, seems to work just fine:
func testBadAccessReplayOnce() {
let expectation = expectation(description: "expectation")
var task: AnyCancellable?
task = Publishers.ReplayOnce<Item, Never> { promise in
Store.shared.fetchItem() { result in
promise(.success(result))
}
}
.sink(
receiveCompletion: { value in
expectation.fulfill()
}, receiveValue: { _ in }
)
wait(for: [expectation])
task?.cancel()
}
I'd love to find a solution to fix these crashes in my app. The app uses Combine heavily, and I don't have the capacity to rewrite all of my Combine usage to use something else. But I don't know of any other real solution to fix these crashes. A real bummer.
By the way, I submitted feedback for this back in October: FB13288789
public final class Eventually<Output, Failure: Error>: Publisher {
public typealias Promise = (Result<Output, Failure>) -> Void
private let current = CurrentValueSubject<Output?, Failure>(nil)
private let output: AnyPublisher<Output, Failure>
private let lock = NSRecursiveLock()
public init(_ attemptToFulfill: @escaping ((@escaping Promise) -> Void)) {
let promise: Promise = { [current, lock] result in
lock.lock()
defer { lock.unlock() }
switch result {
case let .success(value):
current.send(value)
case let .failure(error):
current.send(completion: .failure(error))
}
}
defer {
attemptToFulfill(promise)
}
self.output = current
.compactMap { $0 }
.first()
.eraseToAnyPublisher()
}
// MARK: Publisher
public func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input {
lock.lock()
defer { lock.unlock() }
if let value = current.value {
let just = Just(value).setFailureType(to: Failure.self)
just.receive(subscriber: subscriber)
} else {
output.receive(subscriber: subscriber)
}
}
}
We removed completing the CurrentValueSubject with current.send(completion: .finished) and added .first() to the output publisher, as first() finishes the publisher after the first element.
My colleague spent the time figuring this out and had this to say:
Calling send(value) and send(completion) on CurrentValueSubject in quick succession appears to be racy with downstream consumers, meaning, some of the time, all they will see is a completed publisher (buffer has no effect, either). Converting that completed publisher to stream, and using it in for each, in the unfortunate case, means the loop is skipped all together. This appears to address the issue.
We're going to begin using this solution and will report back if there are any issues.