Attempt to use AsyncStream for event listeners

Here is some code that tries to implement the event listener pattern using async/await and an AsyncStream. Normally I reach for Combine's PassthroughSubject for something like this, but I'm experimenting.

I thought the continuation's onTermination would be called when the for loop finished, but it's not happening. So, the event source doesn't know the listener is gone. Why isn't onTermination called here?

Suggestions welcome. :slight_smile:

actor EventSource {
    private var listeners: [UUID: AsyncStream<Int>.Continuation] = [:]
    
    private func remove(id: UUID) { listeners.removeValue(forKey: id) }
    
    func events() -> AsyncStream<Int> {
        AsyncStream(Int.self) { cont in
            let id = UUID()
            listeners[id] = cont
            print("listeners.append, count = \(listeners.count)")
            cont.onTermination = { @Sendable reason in
                print("onTermination \(reason)")
                Task {
                    await self.remove(id: id)
                }
            }
        }
    }
    
    func fire(_ event: Int) {
        print("fire \(event), listeners.count = \(listeners.count)")
        for (_,c) in listeners {
            c.yield(event)
        }
    }
}

let eventSource = EventSource()

struct ContentView: View {
    @State var counter: Int = 1
    @State var eventLog: String = ""
    var body: some View {
        VStack {
            Button { fireEvent() } label: { Text("Fire Event") }
                .padding(.bottom, 20)
            Text("Events:")
            Text(verbatim: eventLog)
        }.task {
            let aseq = await eventSource.events()
            for await e in aseq {
                eventLog += "Event \(e)\n"
                if e >= 5 {
                    break
                }
            }
            print("done listening")
        }
    }
    
    func fireEvent() {
        Task {
            await eventSource.fire(counter)
            counter += 1
        }
    }
}