In hopes of fully adopting concurrency with Swift 6, I was building a wrapper around CoreLocation, and while for the most part things work. The problem arises when providing a way to listen to any changes in authorization status or user location, the proper async way to do would be AsyncStreams, and while they do work, nullifying that property when the stream terminates is rather challenging. As any and every trick I try to nullify that continuation results in some variation of Capture of 'self' with non-sendable type 'LocationManager' in a @Sendable closure. While I kind of understand the issue, I don't see how this can be an issue in this given scenario. The main object cannot be an actor as the error becomes Actor-isolated property 'authorizationStatusStreamContinuation' can not be mutated from a Sendable closure regardless of other issues that arise, and it cannot be sendable as CLLOcationManager is not Sendable.
All the solutions I found online work with Swift 5.5, but none with Swift 6
Am I doing something wrong, or is it just a non-solvable issue?
Below is a stripped down version of the necessary code:
public final class LocationManager: NSObject, CLLocationManagerDelegate {
fileprivate var authorizationStatusStreamContinuation: AsyncStream<CLAuthorizationStatus>.Continuation?
private let locationManager = CLLocationManager()
public override init() {
super.init()
locationManager.delegate = self
}
public func locationUpdates() -> AsyncStream<CLAuthorizationStatus> {
AsyncStream(bufferingPolicy: .bufferingNewest(1)) { continuation in
authorizationStatusStreamContinuation = continuation
continuation.yield(CLAuthorizationStatus(CLLocationManager.authorizationStatus()))
continuation.onTermination = { _ in
self.authorizationStatusStreamContinuation = nil // THIS IS WHERE THE ISSUE LIES
}
}
}
public func locationManagerDidChangeAuthorization(_ manager: CLLocationManager) {
authorizationStatusStreamContinuation?.yield(manager.authorizationStatus)
}
}
Hmm… what about nonisolated(unsafe) on the locationManager instance just to silence the errors (and then guarantee yourself that access is serialized)?
It's still the same issue as the problem is capturing self. And even if you try and capture the function alone, the error becomes Capture of 'terminationHelper' with non-sendable type '() -> ()' in a @Sendable closure
This too isn't an option as the manager contains a few stored properties (other continuations) the version I shared is a stripped down version.
It just feels like one of those things that just cannot be done but I do not understand why.
The problem you have is that the onTermination callback is called in context of the thread doing the stream iteration, not your context. That means it is definitely not thread-safe, and you really do need to take steps to make it so.
In this case, you can import Synchronization and then wrap the continuation in a Mutex. Then use the mutex to synchronise the accesses including setting the continuation to nil.
an issue with doing it this way is that when the stream is 'handed out' to a client, if a Task iterating it is canceled, the stream itself will be terminated, so subsequent clients will not be able to use it after that point. this is presumably undesirable, so i'd personally advise against such an approach, at least in the absence of further motivation.
as Cory mentioned earlier, the data race safety errors can be resolved through the use of a mutex to protect the mutable state which may be accessible from different execution contexts. here's an example that adapts the code to do this through use of OSAllocatedUnfairLock (Mutex could presumably be used instead depending on the minimum deployment target, though the ownership attributes might require minor modifications):
final class LocationManager: NSObject, CLLocationManagerDelegate {
// mutable state that should be protected by a mutex
struct ProtectedState {
// continuation for delivering auth status to most-recent caller of `locationUpdates()`
fileprivate var authorizationStatusStreamContinuation: AsyncStream<CLAuthorizationStatus>.Continuation?
}
private let locationManager = CLLocationManager()
private let protectedState = OSAllocatedUnfairLock(initialState: ProtectedState())
override init() {
super.init()
locationManager.delegate = self
}
func locationUpdates() -> AsyncStream<CLAuthorizationStatus> {
AsyncStream(bufferingPolicy: .bufferingNewest(1)) { continuation in
// store the current auth status in the new continuation
//
// Note: the `CLLocationManager.authorizationStatus()` API is deprecated, so we
// read the value off the location manager instead. as an aside – this call can
// sometimes hang while synchronous xpc calls are performed within the CoreLocation
// framework, so caching the last-known value from the delegate callback may offer
// a marginally better implementation.
continuation.yield(locationManager.authorizationStatus)
// configure tear-down logic
continuation.onTermination = { [protectedState] _ in
protectedState.withLock { $0.authorizationStatusStreamContinuation = nil }
}
// update internal state
protectedState.withLock {
$0.authorizationStatusStreamContinuation = continuation
}
}
}
func locationManagerDidChangeAuthorization(_ manager: CLLocationManager) {
// read the new value outside the lock
let newAuthStatus = manager.authorizationStatus
// access internal state with the lock held
let continuation = protectedState.withLock {
$0.authorizationStatusStreamContinuation
}
// notify clients outside the lock
continuation?.yield(newAuthStatus)
}
}
this should address the original compiler errors. however, there are a few additional issues to consider with an implementation such as this.
Issue 1: race conditions
the first is that, though we've resolved the risks of data-races on the stored continuation, we have not eliminated the possibility that the changes to the stored continuation are free from race conditions. to illustrate this concern, imagine that we have the following (admittedly contrived) sequence of code:
@MainActor
func example() async {
// assume we're running on Task 'T0' here
let manager = LocationManager()
let stream1 = manager.locationUpdates()
let t1 = Task.detached {
// call this Task 'T1'
for await a in stream1 {
print("T1 got auth status: \(a)")
}
}
Task.detached {
// call this Task 'T2'
t1.cancel()
}
let stream2 = manager.locationUpdates()
for await a in stream2 {
print("T0 got auth status: \(a)")
}
}
given the synchronization we've implemented, a possible ordering of events in this case is:
stream1 begins iteration on T1 and suspends
stream2 is created by a call to locationUpdates() on T0, updating the internal continuation
T2 cancels T1, running the onTermination block, which resets the stored continuation to nil
at the end of this sequence, the LocationManager's internal stored continuation will have been set to nil, which means subsequent updates will not be propagated to stream2. this is effectively the analog of two threads racing to set a single (atomic) delegate property.
one possible way to address this is to store the pending continuations in a collection, and remove the appropriate entry in the onTermination blocks. additionally, such an implementation would make supporting 'multi-cast' updates fairly straightforward.
Issue 2: RunLoop requirements
initializing a CLLocationManager has some side-effects that matter quite a bit. in particular, it will use the RunLoop of the thread where it is initialized as the one which receives its delegate callbacks (docs). this means that not explicitly managing the execution context of its initialization risks causing bugs or otherwise confusing behavior. personally i'd suggest that whatever type manages the underlying CLLocationManager's setup be marked either @MainActor or internally manage the initialization to ensure it occurs on a thread with a known functional RunLoop. an example of how this can cause problems is if initialization occurs on either a dispatch work queue, or one of the shared cooperative concurrency threads. e.g.
@MainActor
func setupLocationManager_main() {
// delegate updates will be delivered on the main RunLoop
let manager = LocationManager()
for await update in manager.locationUpdates() {
// ...
}
}
nonisolated func setupLocationManager_bg() {
// delegate updates will most likely _not_ be delivered since the
// thread used here won't generally have an active RunLoop
let manager = LocationManager()
for await update in manager.locationUpdates() {
// ...
}
}
UPDATE: It was because using a single AsyncStream<CLAuthorizationStatus>.Continuation just like you mentioned in your Issue 1: race conditions. So the last one was overriding the first, by switching to a dictionary it solved the issue.
I applied your solution and it does work so thank you. However when testing for the two issues you mentioned, I noticed that if I write this code
Task { [manager] in
let updates = manager.authorizationUpdates()
for await value in updates {
print("Task1:", value)
}
}
Task { [manager] in
let updates = manager.authorizationUpdates()
for await value in updates {
print("Task2:", value)
}
}
in the initializer of a SwiftUI view, so on the main thread, it runs both tasks 1 time, but then future updates only get invoked by the second thread.