Here is some code that tries to implement the event listener pattern using async
/await
and an AsyncStream
. Normally I reach for Combine's PassthroughSubject
for something like this, but I'm experimenting.
I thought the continuation's onTermination
would be called when the for
loop finished, but it's not happening. So, the event source doesn't know the listener is gone. Why isn't onTermination
called here?
Suggestions welcome.
actor EventSource {
private var listeners: [UUID: AsyncStream<Int>.Continuation] = [:]
private func remove(id: UUID) { listeners.removeValue(forKey: id) }
func events() -> AsyncStream<Int> {
AsyncStream(Int.self) { cont in
let id = UUID()
listeners[id] = cont
print("listeners.append, count = \(listeners.count)")
cont.onTermination = { @Sendable reason in
print("onTermination \(reason)")
Task {
await self.remove(id: id)
}
}
}
}
func fire(_ event: Int) {
print("fire \(event), listeners.count = \(listeners.count)")
for (_,c) in listeners {
c.yield(event)
}
}
}
let eventSource = EventSource()
struct ContentView: View {
@State var counter: Int = 1
@State var eventLog: String = ""
var body: some View {
VStack {
Button { fireEvent() } label: { Text("Fire Event") }
.padding(.bottom, 20)
Text("Events:")
Text(verbatim: eventLog)
}.task {
let aseq = await eventSource.events()
for await e in aseq {
eventLog += "Event \(e)\n"
if e >= 5 {
break
}
}
print("done listening")
}
}
func fireEvent() {
Task {
await eventSource.fire(counter)
counter += 1
}
}
}