When working with older APIs, I oftentimes wrap them in an AsyncStream to create a nicer API when working with Swift Concurrency. I make use of AsyncStream.Continuation.onTermination to control when to stop the older API from sending updates. This mostly happens with two cases:
The API has an explicit "stop" function which must be called.
The API is some kind of reference type, which will automatically cancel when dereferenced.
For example, take this function
import AppKit
public enum Keyboard {
public static func updates() -> AsyncStream<NSEvent.ModifierFlags>? {
let (stream, continuation) = AsyncStream.makeStream(of: NSEvent.ModifierFlags.self)
let monitor = NSEvent.addLocalMonitorForEvents(matching: .keyDown) { event in
continuation.yield(event.modifierFlags)
return event
}
guard let monitor else { return nil }
continuation.onTermination = { _ in
NSEvent.removeMonitor(monitor) // Capture of 'monitor' with non-sendable type 'Any' in a `@Sendable` closure
}
return stream
}
}
When trying to cancel the monitor, I will get a warning, because the onTermination closure is Sendable, and the Any type of monitor is not. I was hoping that this would be automagically fixed by enabling Region-based isolation, but this doesn't seem to have any effect.
What would be the recommended approach to tackle this issue?
Something I was wondering too: if the onTermination closure would be transferring (like described in this post), would this fix the warning? My assumption is that the current warning appears because the compiler cannot guarantee that the monitor would not be passed around, which transferring could enforce?
I’m not going to wade into this in the general case, but I’m very concerned about the specific example you posted. Unless otherwise documented AppKit is a main-thread-only framework. AFAIK NSEvent monitors are not documented to be thread safe. Thus, all of that work would need to be done on the main actor.
Thank you for letting me know!
I cannot find anything about the thread safety of monitors, so I should indeed assume they should only be called on the main actor.
The onTermination function is nonisolated though. I am able to fix all warnings by using the following code
In my code, I created a simple property wrapper to convert any property into an AsyncStream, and I'm facing a similar problem:
@propertyWrapper
public final class ToAsyncStream<Value> where Value: Sendable {
private var value: Value
private var continuations: [UUID: AsyncStream<Value>.Continuation] = [:]
public var wrappedValue: Value {
get { value }
set {
value = newValue
continuations.values.forEach { $0.yield(value) }
}
}
public var projectedValue: AsyncStream<Value> {
AsyncStream<Value>(bufferingPolicy: .unbounded) { continuation in
let id = UUID()
continuations[id] = continuation
continuation.yield(value)
continuation.onTermination = { _ in
// Capture of 'self' with non-sendable type 'ToAsyncStream<Value>' in a `@Sendable` closure; this is an error in the Swift 6 language modeSourceKiterror-in-future-swift-version
// Utils.swift(33, 20): Generic class 'ToAsyncStream' does not conform to the 'Sendable' protocol
self.continuations.removeValue(forKey: id)
}
}
}
public init(wrappedValue: Value) {
self.value = wrappedValue
}
}
Can someone recommend what would be the best idea to get rid of these errors here? Or perhaps you know a better approach to convert a property into an AsyncStream?
welcome to the Swift forums! that warning is highlighting the data race that can occur if the continuations dictionary is allowed to be concurrently accessed from different executions contexts without synchronization. depending on your required Swift version and platform, you might consider using Mutex or OSAllocatedUnfairLock as a means of protecting that state. with your synchronization tool of choice you'd then change the dictionary use to occur only when you've ensured mutually exclusive access. e.g. something like:
// replace `continuations` property with something like this
let protectedState = OSAllocatedUnfairLock(initialState: [UUID: AsyncStream<Value>.Continuation]())
// ...
protectedState.withLock { $0[id] = continuation }
// ...
continuation.onTermination = { [protectedState] _ in
protectedState.withLock { $0.removeValue(forKey: id) }
}
perhaps you can share your full example so the error can be reproduced.
the fundamental issue that caused the initial error is that the onTermination closure is @Sendable[1], meaning it must be known (to the compiler) to be safe to call from any execution context. in your example, self is not Sendable, so cannot be captured by the termination handler.
the option i offered earlier dealt with this by not capturing self and just using the mutable state (safely accessed via a mutex) that needed to be updated after a stream terminates.
if you want to use self within the termination handler, then you will need to ensure it is Sendable. you could annotate the type as it exists as @unchecked Sendable. this would effectively be 'promising' the compiler that you're handling mutable state safely (you would still need to protect the state mutation somehow). alternatively you may be able to refactor the property wrapper as a struct in which case, if all its stored properties are Sendable, then it can conform to Sendable without the @unchecked annotation.
as to whether of not this is 'overkill'... i suppose that is subjective, but the code you originally shared had the potential for a data race – the continuations dictionary could be written & read from multiple threads (without synchronization).
however this fact doesn't appear to be as clear in the documentation as it perhaps could be. ↩︎