Bridging the delegate pattern to AsyncStream with Swift6 and Sendability issues

an issue with doing it this way is that when the stream is 'handed out' to a client, if a Task iterating it is canceled, the stream itself will be terminated, so subsequent clients will not be able to use it after that point. this is presumably undesirable, so i'd personally advise against such an approach, at least in the absence of further motivation.

as Cory mentioned earlier, the data race safety errors can be resolved through the use of a mutex to protect the mutable state which may be accessible from different execution contexts. here's an example that adapts the code to do this through use of OSAllocatedUnfairLock (Mutex could presumably be used instead depending on the minimum deployment target, though the ownership attributes might require minor modifications):

final class LocationManager: NSObject, CLLocationManagerDelegate {

  // mutable state that should be protected by a mutex
  struct ProtectedState {
    // continuation for delivering auth status to most-recent caller of `locationUpdates()`
    fileprivate var authorizationStatusStreamContinuation: AsyncStream<CLAuthorizationStatus>.Continuation?
  }

  private let locationManager = CLLocationManager()
  private let protectedState = OSAllocatedUnfairLock(initialState: ProtectedState())

  override init() {
    super.init()
    locationManager.delegate = self
  }

  func locationUpdates() -> AsyncStream<CLAuthorizationStatus> {
    AsyncStream(bufferingPolicy: .bufferingNewest(1)) { continuation in

      // store the current auth status in the new continuation
      //
      // Note: the `CLLocationManager.authorizationStatus()` API is deprecated, so we
      // read the value off the location manager instead. as an aside – this call can
      // sometimes hang while synchronous xpc calls are performed within the CoreLocation
      // framework, so caching the last-known value from the delegate callback may offer
      // a marginally better implementation.
      continuation.yield(locationManager.authorizationStatus)

      // configure tear-down logic
      continuation.onTermination = { [protectedState] _ in
        protectedState.withLock { $0.authorizationStatusStreamContinuation = nil }
      }

      // update internal state
      protectedState.withLock {
        $0.authorizationStatusStreamContinuation = continuation
      }
    }
  }

  func locationManagerDidChangeAuthorization(_ manager: CLLocationManager) {
    // read the new value outside the lock
    let newAuthStatus = manager.authorizationStatus

    // access internal state with the lock held
    let continuation = protectedState.withLock {
      $0.authorizationStatusStreamContinuation
    }

    // notify clients outside the lock
    continuation?.yield(newAuthStatus)
  }
}

this should address the original compiler errors. however, there are a few additional issues to consider with an implementation such as this.

Issue 1: race conditions

the first is that, though we've resolved the risks of data-races on the stored continuation, we have not eliminated the possibility that the changes to the stored continuation are free from race conditions. to illustrate this concern, imagine that we have the following (admittedly contrived) sequence of code:

@MainActor
func example() async {
  // assume we're running on Task 'T0' here
  let manager = LocationManager()
  let stream1 = manager.locationUpdates()

  let t1 = Task.detached {
    // call this Task 'T1'
    for await a in stream1 {
      print("T1 got auth status: \(a)")
    }
  }

  Task.detached {
    // call this Task 'T2'
    t1.cancel()
  }

  let stream2 = manager.locationUpdates()
  for await a in stream2 {
    print("T0 got auth status: \(a)")
  }
}

given the synchronization we've implemented, a possible ordering of events in this case is:

  1. stream1 begins iteration on T1 and suspends
  2. stream2 is created by a call to locationUpdates() on T0, updating the internal continuation
  3. T2 cancels T1, running the onTermination block, which resets the stored continuation to nil

at the end of this sequence, the LocationManager's internal stored continuation will have been set to nil, which means subsequent updates will not be propagated to stream2. this is effectively the analog of two threads racing to set a single (atomic) delegate property.

one possible way to address this is to store the pending continuations in a collection, and remove the appropriate entry in the onTermination blocks. additionally, such an implementation would make supporting 'multi-cast' updates fairly straightforward.

Issue 2: RunLoop requirements

initializing a CLLocationManager has some side-effects that matter quite a bit. in particular, it will use the RunLoop of the thread where it is initialized as the one which receives its delegate callbacks (docs). this means that not explicitly managing the execution context of its initialization risks causing bugs or otherwise confusing behavior. personally i'd suggest that whatever type manages the underlying CLLocationManager's setup be marked either @MainActor or internally manage the initialization to ensure it occurs on a thread with a known functional RunLoop. an example of how this can cause problems is if initialization occurs on either a dispatch work queue, or one of the shared cooperative concurrency threads. e.g.

@MainActor
func setupLocationManager_main() {
  // delegate updates will be delivered on the main RunLoop
  let manager = LocationManager()
  for await update in manager.locationUpdates() {
    // ...
  }
}

nonisolated func setupLocationManager_bg() {
  // delegate updates will most likely _not_ be delivered since the
  // thread used here won't generally have an active RunLoop
  let manager = LocationManager()
  for await update in manager.locationUpdates() {
    // ...
  }
}
1 Like