What is a recommended approach for passing non-sendable values to AsyncStream.Continuation.onTermination?

When working with older APIs, I oftentimes wrap them in an AsyncStream to create a nicer API when working with Swift Concurrency. I make use of AsyncStream.Continuation.onTermination to control when to stop the older API from sending updates. This mostly happens with two cases:

  • The API has an explicit "stop" function which must be called.
  • The API is some kind of reference type, which will automatically cancel when dereferenced.

For example, take this function

import AppKit

public enum Keyboard {
    public static func updates() -> AsyncStream<NSEvent.ModifierFlags>? {
        let (stream, continuation) = AsyncStream.makeStream(of: NSEvent.ModifierFlags.self)
        
        let monitor = NSEvent.addLocalMonitorForEvents(matching: .keyDown) { event in
            continuation.yield(event.modifierFlags)
            return event
        }

        guard let monitor else { return nil }

        continuation.onTermination = { _ in
            NSEvent.removeMonitor(monitor) // Capture of 'monitor' with non-sendable type 'Any' in a `@Sendable` closure
        }
        
        return stream
    }
}

When trying to cancel the monitor, I will get a warning, because the onTermination closure is Sendable, and the Any type of monitor is not. I was hoping that this would be automagically fixed by enabling Region-based isolation, but this doesn't seem to have any effect.

What would be the recommended approach to tackle this issue?

Something I was wondering too: if the onTermination closure would be transferring (like described in this post), would this fix the warning? My assumption is that the current warning appears because the compiler cannot guarantee that the monitor would not be passed around, which transferring could enforce?

2 Likes

I’m not going to wade into this in the general case, but I’m very concerned about the specific example you posted. Unless otherwise documented AppKit is a main-thread-only framework. AFAIK NSEvent monitors are not documented to be thread safe. Thus, all of that work would need to be done on the main actor.

Share and Enjoy

Quinn “The Eskimo!” @ DTS @ Apple

4 Likes

Thank you for letting me know!
I cannot find anything about the thread safety of monitors, so I should indeed assume they should only be called on the main actor.

The onTermination function is nonisolated though. I am able to fix all warnings by using the following code

let stopMonitor: @MainActor @Sendable () -> Void = {
    NSEvent.removeMonitor(monitor)
}

continuation.onTermination = { _ in
    Task {
        await stopMonitor()
    }
}

However, this introduces a new Task which isn't ideal. Would there be a better way to achieve this?

Not sure how far you'll get with this, but you might need to implement an AsyncSequence instead, e.g.: swift-async-algorithms/Sources/AsyncAlgorithms/AsyncTimerSequence.swift at main · apple/swift-async-algorithms · GitHub

However, this introduces a new Task which isn't ideal. Would
there be a better way to achieve this?

Given that:

  • onTermination has no isolation constraints

  • Event monitors are main-thread-only

I don’t see any way to avoid that task.

Share and Enjoy

Quinn “The Eskimo!” @ DTS @ Apple

2 Likes

I will look into this, thank you!

In my code, I created a simple property wrapper to convert any property into an AsyncStream, and I'm facing a similar problem:

@propertyWrapper
public final class ToAsyncStream<Value> where Value: Sendable {
  private var value: Value
  private var continuations: [UUID: AsyncStream<Value>.Continuation] = [:]

  public var wrappedValue: Value {
    get { value }
    set {
      value = newValue
      continuations.values.forEach { $0.yield(value) }
    }
  }

  public var projectedValue: AsyncStream<Value> {
    AsyncStream<Value>(bufferingPolicy: .unbounded) { continuation in
      let id = UUID()
      continuations[id] = continuation

      continuation.yield(value)

      continuation.onTermination = { _ in
        // Capture of 'self' with non-sendable type 'ToAsyncStream<Value>' in a `@Sendable` closure; this is an error in the Swift 6 language modeSourceKiterror-in-future-swift-version
        // Utils.swift(33, 20): Generic class 'ToAsyncStream' does not conform to the 'Sendable' protocol
        self.continuations.removeValue(forKey: id)
      }
    }
  }

  public init(wrappedValue: Value) {
    self.value = wrappedValue
  }
}

Can someone recommend what would be the best idea to get rid of these errors here? Or perhaps you know a better approach to convert a property into an AsyncStream?

I use this approach instead of Combine.

welcome to the Swift forums! that warning is highlighting the data race that can occur if the continuations dictionary is allowed to be concurrently accessed from different executions contexts without synchronization. depending on your required Swift version and platform, you might consider using Mutex or OSAllocatedUnfairLock as a means of protecting that state. with your synchronization tool of choice you'd then change the dictionary use to occur only when you've ensured mutually exclusive access. e.g. something like:

// replace `continuations` property with something like this
let protectedState = OSAllocatedUnfairLock(initialState: [UUID: AsyncStream<Value>.Continuation]())

// ...

protectedState.withLock { $0[id] = continuation }

// ...

continuation.onTermination = { [protectedState] _ in
  protectedState.withLock { $0.removeValue(forKey: id) }
}

This code is showing an error:

Conflicting arguments to generic parameter 'R' ('Void' vs. 'AsyncStream<Value>.Continuation?')

I replaced the last bit of code with:

_ = protectedState.withLock { $0.removeValue(forKey: id) }

But all of this feels like an overkill. What if I want to call a method on the class inside onTermination?

perhaps you can share your full example so the error can be reproduced.

the fundamental issue that caused the initial error is that the onTermination closure is @Sendable[1], meaning it must be known (to the compiler) to be safe to call from any execution context. in your example, self is not Sendable, so cannot be captured by the termination handler.

the option i offered earlier dealt with this by not capturing self and just using the mutable state (safely accessed via a mutex) that needed to be updated after a stream terminates.

if you want to use self within the termination handler, then you will need to ensure it is Sendable. you could annotate the type as it exists as @unchecked Sendable. this would effectively be 'promising' the compiler that you're handling mutable state safely (you would still need to protect the state mutation somehow). alternatively you may be able to refactor the property wrapper as a struct in which case, if all its stored properties are Sendable, then it can conform to Sendable without the @unchecked annotation.

as to whether of not this is 'overkill'... i suppose that is subjective, but the code you originally shared had the potential for a data race – the continuations dictionary could be written & read from multiple threads (without synchronization).


  1. however this fact doesn't appear to be as clear in the documentation as it perhaps could be. ↩︎

2 Likes