I made this property wrapper that turns a stored property into an async sequence.
Its semantics are similar to Combine’s CurrentValueSubject. Multiple clients are allowed. By default, buffering is disabled.
What do you think?
I made this property wrapper that turns a stored property into an async sequence.
Its semantics are similar to Combine’s CurrentValueSubject. Multiple clients are allowed. By default, buffering is disabled.
What do you think?
Thank you, @nikolai.ruhe
I have the following code.
import Foundation
import Dispatch
@main
enum Driver {
static func main () async throws {
let executors: [MySerialExecutor] = [
MySerialExecutor (queue: DispatchQueue (label: "com.example.background-1")),
MySerialExecutor (queue: DispatchQueue (label: "com.example.background-2")),
MySerialExecutor (queue: DispatchQueue (label: "com.example.background-3")),
MySerialExecutor (queue: DispatchQueue (label: "com.example.background-4")),
MySerialExecutor (queue: DispatchQueue (label: "com.example.background-5")),
]
let btm = MyBT.Manager ()
var x = 0
for executor in executors {
Task (executorPreference: executor) {
try await monitor (stateOf: btm, id: x)
}
x += 1
}
let executor = MySerialExecutor (queue: DispatchQueue(label: "com.example.background-x"))
Task (executorPreference: executor) {
try await change(stateOf: btm)
}
try await Task.sleep (until: .now + .seconds (60))
}
}
func monitor (stateOf btm: MyBT.Manager, id: Int) async throws {
var x = 0
while x < 10 {
x += 1
await btm.m.waitForBluetoothToPowerOn(id: id)
try await Task.sleep (until: .now + .seconds (3))
}
}
func change (stateOf btm: MyBT.Manager) async throws {
let states: [BluetoothManager.CBManagerState] = [.unknown, .poweredOn, .poweredOff]
var x = 0
while x < 20 {
await btm.set (state: states [x % states.count])
x += 1
try await Task.sleep (until: .now + .seconds (3))
}
}
enum MyBT {
actor Manager {
let m = BluetoothManager ()
func set (state: BluetoothManager.CBManagerState) {
m.bluetoothState = state
}
}
}
//
// BlueToothManager.swift
// Anything
//
// Created by ibex on 15/8/2025.
//
// [https://forums.swift.org/t/streaming-looking-for-feedback/81648]
#if false
class BluetoothManager {
@Streaming
var bluetoothState: CBManagerState = .unknown
func waitForBluetoothToPowerOn() async {
for await state in $bluetoothState where state == .poweredOn {
break
}
}
}
#else
class BluetoothManager {
@Streaming
var bluetoothState: CBManagerState = .unknown
func waitForBluetoothToPowerOn(id: Int) async {
print ("--> enter", id, bluetoothState)
for await state in $bluetoothState where (state == .poweredOn || state == .poweredOff) {
print ("-->", id, bluetoothState)
break
}
print ("--> exit", id, bluetoothState)
}
enum CBManagerState {
case unknown, poweredOn, poweredOff
}
}
#endif
//
// MySerialExecutor.swift
// Anything
//
// Created by ibex on 15/8/2025.
//
// [DeepSeek]
import Dispatch
final class MySerialExecutor: SerialExecutor, TaskExecutor {
private let queue: DispatchQueue
init (queue: DispatchQueue) {
self.queue = queue
}
func enqueue (_ job: consuming ExecutorJob) {
let unownedJob = UnownedJob(job)
let unownedExecutor = asUnownedSerialExecutor()
queue.async {
unownedJob.runSynchronously(on: unownedExecutor)
}
}
func asUnownedSerialExecutor() -> UnownedSerialExecutor {
UnownedSerialExecutor(ordinary: self)
}
}
But, the output I get somehow doesn't feel right because not all monitors appear to run, and some enters multiple times before exiting.
--> enter 1 unknown
--> enter 4 unknown
--> enter 2 unknown
--> enter 5 unknown
--> enter 5 unknown
--> 1 poweredOn
--> 5 poweredOn
--> 5 poweredOn
--> exit 5 poweredOn
--> 4 poweredOn
--> 2 poweredOn
--> exit 2 poweredOn
--> exit 5 poweredOn
--> exit 4 poweredOn
--> exit 1 poweredOn
--> enter 4 poweredOff
--> enter 5 poweredOff
--> enter 2 poweredOff
--> enter 1 poweredOff
--> enter 5 poweredOff
--> 5 poweredOn
--> 2 poweredOn
--> exit 2 poweredOn
--> 5 poweredOn
--> exit 5 poweredOn
--> exit 5 poweredOn
--> 1 poweredOn
--> exit 1 poweredOn
--> 4 poweredOn
--> exit 4 poweredOn
--> enter 5 poweredOff
--> enter 4 poweredOff
--> enter 1 poweredOff
--> enter 2 poweredOff
--> enter 5 poweredOff
--> 2 poweredOn
--> 4 poweredOn
--> exit 4 poweredOn
--> 5 poweredOn
--> exit 2 poweredOn
--> 5 poweredOn
--> exit 5 poweredOn
--> exit 5 poweredOn
--> 1 poweredOn
--> exit 1 poweredOn
--> enter 1 poweredOn
--> enter 2 poweredOn
--> enter 4 poweredOff
--> 2 poweredOff
--> enter 5 poweredOff
--> enter 5 poweredOff
--> exit 2 poweredOff
--> 1 poweredOff
--> exit 1 poweredOff
--> enter 1 unknown
--> enter 2 unknown
--> 5 poweredOn
--> 2 poweredOn
--> 5 poweredOn
--> 4 poweredOn
--> exit 4 poweredOn
--> exit 2 poweredOn
--> 1 poweredOn
--> exit 5 poweredOn
--> exit 5 poweredOn
--> exit 1 poweredOn
--> enter 2 poweredOff
--> enter 5 poweredOff
--> enter 4 poweredOff
--> enter 1 poweredOff
--> enter 5 poweredOff
--> 2 poweredOn
--> 1 poweredOn
--> 4 poweredOn
--> exit 4 poweredOn
--> exit 2 poweredOn
--> 5 poweredOn
--> exit 5 poweredOn
--> 5 poweredOn
--> exit 5 poweredOn
--> exit 1 poweredOn
--> enter 5 poweredOn
--> enter 5 poweredOn
--> 5 poweredOff
--> 5 poweredOff
--> exit 5 poweredOff
--> enter 4 poweredOff
--> enter 1 poweredOff
--> exit 5 poweredOff
--> enter 2 poweredOff
--> enter 5 unknown
--> enter 5 unknown
--> 2 poweredOn
--> exit 2 poweredOn
--> 5 poweredOn
--> 5 poweredOn
--> exit 5 poweredOn
--> exit 5 poweredOn
--> 4 poweredOn
--> 1 poweredOn
--> exit 4 poweredOn
--> exit 1 poweredOn
--> enter 2 poweredOff
--> enter 5 poweredOff
--> enter 1 poweredOff
--> enter 5 poweredOff
--> enter 4 poweredOff
--> 4 poweredOn
--> 2 poweredOn
--> 5 poweredOn
--> exit 4 poweredOn
--> 5 poweredOn
--> exit 5 poweredOn
--> 1 poweredOn
--> exit 2 poweredOn
--> exit 1 poweredOn
--> exit 5 poweredOn
Program ended with exit code: 0
What am I doing wrong?
What’s the motivation for using a global shared “nextID” counter vs one stored in the property wrapper itself? Or even UUIDs as the continuation identifier?
@ibex10 Sorry, your code does not compile for me: "Non-Sendable type 'BluetoothManager' of property 'm' cannot exit actor-isolated context" at line
await btm.m.waitForBluetoothToPowerOn(id: id)
Also, your test rig is somewhat complicated and without diving deep into the many involved parts (serial executors, dispatch queues, actor instances, ...) I would not see myself fit to predict a possible output.
That being said, your pasted output does look suspicious: I would have expected it to start with --> enter x unknown
lines 1 ... 5. But in your case, 3 is missing and 5 is present twice. That looks fishy.
Edit:
Thinking about it again, the wrong numbers look like a race condition in the Task spawning loop. Could you switch to language mode 6, for a try?
One more thought: If you are missing entries, this might be triggered by the fact, that the default buffering strategy is unbuffered. If your clients need to see every change, even when they consume the stream slower than the source, you should try buffering: @Streaming(bufferingPolicy: .unbounded)
There's no actual reason behind using a global source of unique IDs. And you're right, in this case, synchronisation is not even necessary. A simple property in the wrapper would have done.
When writing the code I was actually considering getting rid of the id for good. SE-0468 makes AsyncStream's continuation conform to Hashable
. But I needed to target older swift versions, so for now I keep the id.
@nikolai.ruhe, thank you.
Got it working now, in Swift 6
, after turning the BluetoothManager
into an actor
, and starting to use the @Streaming(bufferingPolicy: .unbounded)
.
//
// Driver.swift
// AsyncStreamPropertyWrapper
//
// Created by ibex on 15/8/2025.
//
import Foundation
import Dispatch
@main
enum Driver {
static func main () async throws {
let executors: [MySerialExecutor] = [
MySerialExecutor (queue: DispatchQueue (label: "com.example.background-1")),
MySerialExecutor (queue: DispatchQueue (label: "com.example.background-2")),
MySerialExecutor (queue: DispatchQueue (label: "com.example.background-3")),
MySerialExecutor (queue: DispatchQueue (label: "com.example.background-4")),
MySerialExecutor (queue: DispatchQueue (label: "com.example.background-5")),
]
let btm = BluetoothManager ()
let counter = Counter ()
for executor in executors {
let x = await counter.value ()
Task (executorPreference: executor) {@Sendable in
try await monitor (stateOf: btm, id: x)
}
await counter.increment()
}
let executor = MySerialExecutor (queue: DispatchQueue(label: "com.example.background-x"))
Task (executorPreference: executor) {
try await change (stateOf: btm)
}
try await Task.sleep (until: .now + .seconds (60))
}
}
actor Counter {
var x: Int = 0
func increment () async {
x += 1
}
func value () async -> Int {
x
}
}
func monitor (stateOf btm: BluetoothManager, id: Int) async throws {
var x = 0
while x < 10 {
x += 1
await btm.waitForBluetoothToPowerOn(id: id)
try await Task.sleep (until: .now + .seconds (3))
}
}
func change (stateOf btm: BluetoothManager) async throws {
let states: [BluetoothManager.CBManagerState] = [.unknown, .poweredOn, .poweredOff]
var x = 0
while x < 20 {
await btm.set (state: states [x % states.count])
x += 1
try await Task.sleep (until: .now + .seconds (3))
}
}
// [[https://forums.swift.org/t/streaming-looking-for-feedback/81648]]
actor BluetoothManager {
@Streaming(wrappedValue: CBManagerState.unknown, bufferingPolicy: .unbounded)
var bluetoothState: CBManagerState
func waitForBluetoothToPowerOn(id: Int) async {
print ("--> enter", id, bluetoothState)
for await state in $bluetoothState where (state == .poweredOn || state == .poweredOff) {
print ("-->", id, bluetoothState)
break
}
print ("--> exit", id, bluetoothState)
}
func set (state: BluetoothManager.CBManagerState) {
bluetoothState = state
}
enum CBManagerState {
case unknown, poweredOn, poweredOff
}
}
--> enter 0 unknown
--> enter 1 unknown
--> enter 2 unknown
--> enter 3 unknown
--> enter 4 unknown
--> 0 poweredOn
--> exit 0 poweredOn
--> 3 poweredOn
--> exit 3 poweredOn
--> 4 poweredOn
--> exit 4 poweredOn
--> 2 poweredOn
--> exit 2 poweredOn
--> 1 poweredOn
--> exit 1 poweredOn
--> enter 3 poweredOff
--> enter 1 poweredOff
--> enter 4 poweredOff
--> enter 0 poweredOff
--> enter 2 poweredOff
--> 4 poweredOn
--> exit 4 poweredOn
--> 2 poweredOn
--> exit 2 poweredOn
--> 3 poweredOn
--> exit 3 poweredOn
--> 0 poweredOn
--> exit 0 poweredOn
--> 1 poweredOn
--> exit 1 poweredOn
--> enter 0 poweredOff
--> enter 1 poweredOff
--> enter 3 poweredOff
--> enter 2 poweredOff
--> enter 4 poweredOff
--> 3 poweredOn
--> exit 3 poweredOn
--> 1 poweredOn
--> exit 1 poweredOn
--> 4 poweredOn
--> exit 4 poweredOn
--> 0 poweredOn
--> exit 0 poweredOn
--> 2 poweredOn
--> exit 2 poweredOn
--> enter 1 poweredOn
--> enter 0 poweredOn
--> enter 3 poweredOn
--> enter 2 poweredOn
--> enter 4 poweredOff
--> 2 poweredOff
--> exit 2 poweredOff
--> 0 poweredOff
--> exit 0 poweredOff
--> 3 poweredOff
--> exit 3 poweredOff
--> 1 poweredOff
--> exit 1 poweredOff
--> enter 2 poweredOff
--> enter 3 poweredOff
--> enter 0 poweredOff
--> enter 1 unknown
--> 3 poweredOn
--> exit 3 poweredOn
--> 4 poweredOn
--> exit 4 poweredOn
--> 1 poweredOn
--> exit 1 poweredOn
--> 2 poweredOn
--> exit 2 poweredOn
--> 0 poweredOn
--> exit 0 poweredOn
--> enter 4 poweredOn
--> enter 1 poweredOn
--> enter 0 poweredOff
--> enter 3 poweredOff
--> enter 2 poweredOff
--> 1 poweredOff
--> exit 1 poweredOff
--> 4 poweredOff
--> exit 4 poweredOff
--> enter 1 poweredOff
--> enter 4 poweredOff
--> 1 poweredOn
--> exit 1 poweredOn
--> 3 poweredOn
--> exit 3 poweredOn
--> 4 poweredOn
--> exit 4 poweredOn
--> 2 poweredOn
--> exit 2 poweredOn
--> 0 poweredOn
--> exit 0 poweredOn
--> enter 2 poweredOff
--> enter 4 poweredOff
--> enter 3 poweredOff
--> enter 0 poweredOff
--> enter 1 poweredOff
--> 0 poweredOn
--> exit 0 poweredOn
--> 2 poweredOn
--> exit 2 poweredOn
--> 1 poweredOn
--> exit 1 poweredOn
--> 3 poweredOn
--> exit 3 poweredOn
--> 4 poweredOn
--> exit 4 poweredOn
--> enter 2 poweredOn
--> enter 0 poweredOn
--> enter 4 poweredOn
--> enter 3 poweredOn
--> enter 1 poweredOff
--> 0 poweredOff
--> exit 0 poweredOff
--> 4 poweredOff
--> exit 4 poweredOff
--> 3 poweredOff
--> exit 3 poweredOff
--> 2 poweredOff
--> exit 2 poweredOff
--> enter 2 poweredOff
--> enter 4 unknown
--> enter 0 unknown
--> enter 3 unknown
--> 2 poweredOn
--> exit 2 poweredOn
--> 0 poweredOn
--> exit 0 poweredOn
--> 1 poweredOn
--> exit 1 poweredOn
--> 4 poweredOn
--> exit 4 poweredOn
--> 3 poweredOn
--> exit 3 poweredOn
--> enter 4 poweredOn
--> enter 3 poweredOn
--> enter 1 poweredOn
Program ended with exit code: 0