Actors working serially with NotificationCenter messages

I'm trying to stitch together an implementation of an actor that responds to notification center messages in order to update the actor's mutable state.

I believe I might be running into a concurrency limitation of NotificationCenter thats causing issues (possibly resolved in swift-foundation/Proposals/0010-concurrency-safe-notifications.md at main · swiftlang/swift-foundation · GitHub), but am curious if there are any recommendations to get this working as expected:

actor MyAPI {
    private var baseUrl: URL
    private var notificationTask: Task<Void, Never>?

    init(baseUrl: String) {
        self.baseUrl = URL(string: baseUrl)!
    }

    deinit {
        notificationTask?.cancel()
    }

    func notify() async -> Self {
        notificationTask = Task {
            let sequence = NotificationCenter.default
                .notifications(named: .webServerDidChange)
                .compactMap { $0.object as? String }
            for await newValue in sequence {
                guard !Task.isCancelled else { return }
                guard let url = URL(string: newValue) else {
                    continue
                }
                await self.setBaseUrl(to: url)
            }
        }
        return self
    }

    func baseUrl() async -> URL {
        return baseUrl
    }

    private func setBaseUrl(to newValue: URL) async {
        baseUrl = newValue
    }
}

So then I would expect the following output:

let api = await MyAPI(baseUrl: "www.foo.com").notify()
let foo = await api.baseUrl() // "www.foo.com"
NotificationCenter.default
    .post(name: .webServerDidChange, object: "www.bar.com")
let bar = await api.baseUrl() // This is still "www.foo.com"

I believe that sequence is not respecting the actor's executor in this example, but am not finding a way to get this working serially.
Any advice is appreciated, thanks

If you wait for 1 second after the post call, does it work as expected? My expectation is that a notification is not delivered serially but at some future time. (I could be totally wrong on this point though)

Yes thats correct, sleeping for one second after the notification gets the correct bar value

Notifications are generally delivered synchronously, and will then be buffered up to some undocumented limit in the AsyncSequence returned by notifications. But you're racing the for await newValue in sequence against the await api.baseURL() — nothing in your code will ensure one executes before the other.

(You also have a retain cycle between your actor and the Task that you have no way to break — generally you will need to ensure that unstructured Tasks are managed at a higher level than the work they do)

Now I see the race.

Say the notification is delivered serially, when for await newValue in sequence gets hit is the following correct:
The runtime could suspend here allowing the let bar = await api.baseUrl() to execute or it could continue to await self.setBaseUrl(to: url) which could also suspend to allow the let bar = await api.baseUrl() to execute?

Or is the issue only that await self.setBaseUrl(to: url) suspends allowing let bar = await api.baseUrl() to execute?

Or are both possible? Is the await on the sequence more of a “busy work/wait” type lock or is it a possible suspension point for the incoming notification?

This one is closer. Every await is a possible suspension point.

In your case, say:

  • the Task in your actor is blocked in await iterator.next() in the for await loop
  • your code synchronously delivers the notification to the async sequence, which unblocks the Task, which probably starts immediately, but might not, depending on how much load the CPU is under
  • but then immediately calls await api.baseURL(), taking the actor's lock
  • the task does some more synchronous work, then tries to await setBaseURL(), which will block until it can get exclusive access to the actor.

So I'd guess that 99+% of the time, there's no way for the code delivering the notification to receive the new baseURL, but nothing guarantees that, and there could be cases where it does receive the new baseURL — for example, if a third task is using the actor when the notification is delivered, the setBaseURL() call might get in ahead.

1 Like

Thanks for the thoughts here on the race in the example actor code.

It sounds like the following will be a blocker from moving forward with this code:

and will then be buffered up to some undocumented limit in the AsyncSequence returned by notifications .

Instead I am deferring back to a Sendable conforming class with the use of a private queue, which works as expected with a publisher:

final class MyAPI: @unchecked Sendable {
    private var baseUrl: URL
    private var cancellable: AnyCancellable?
    private let queue = DispatchQueue(label: "MyAPI.queue", qos: .userInitiated)

    init(baseUrl: String) {
        self.baseUrl = URL(string: baseUrl)!
        self.cancellable = NotificationCenter.default
            .publisher(for: .webServerDidChange)
            .receive(on: queue)
            .compactMap { $0.object as? String }
            .compactMap { URL(string: $0) }
            .sink { newValue in
                self.baseUrl = newValue
            }
    }

    deinit {
        cancellable?.cancel()
    }

    func getBaseUrl() -> URL {
        queue.sync {
            return self.baseUrl
        }
    }
}

You can achieve the same thing with checked Sendable without involving Combine, something like:

import Synchronization

final class MyAPI: Sendable {
    struct State {
        var baseURL: URL
        var observationToken: (any NSObjectProtocol)?
    }

    let state: Synchronization.Mutex<State>

    init(baseURL: String) {
        self.state = Mutex(State(
            baseURL: URL(string: baseURL)!,
            observationToken: nil
        ))
        let observationToken = NotificationCenter.default.addObserver(
            forName: .webServerDidChange,
            object: nil,
            queue: nil
        ) { [weak self] in
            guard let string = $0.object as? String, let baseURL = URL(string: string) else {
                return
            }
            self?.state.withLock {
                $0.baseURL = baseURL
            }
        }
        state.withLock {
            $0.observationToken = observationToken
        }
    }

    deinit {
        state.withLock {
            if let observationToken = $0.observationToken {
                NotificationCenter.default.removeObserver(observationToken)
            }
        }
    }
}

(Substitute OSAllocatedUnfairLock if Mutex is unavailable)

This does queue.async under the hood, so this method still won't receive values synchronously. You'd have to remove this and put queue.sync { ... } into sink to actually receive values synchronously.

Ah thanks for the Combine-less alternative.

In my class code the queue is still executing the DispatchWorkItems serially (since I didn't create the DispatchQueue with the .concurrent flag). Combine is indeed using queue.async which schedules the work item asynchronously (ie it does not wait for the WorkItem to complete) which is fine for the purposes of this publisher emitting the notifications.

For example:

let queue = DispatchQueue(label: "SampleQueue", qos: .userInitiated)
print("Scheduling first async...")
queue.async {
    Thread.sleep(forTimeInterval: 3)
    print("I'm always completed first")
}
print("Scheduling second async...")
queue.async {
    Thread.sleep(forTimeInterval: 1)
    print("I'm always completed second")
}
print("Scheduling third sync...")
queue.sync {
    print("I'm always completed third")
}
print("... waiting to schedule final async...")
queue.async {
    print("I'm always completed last")
}
Scheduling first async...
Scheduling second async...
Scheduling third sync...
I'm always completed first
I'm always completed second
I'm always completed third
... waiting to schedule final async...
I'm always completed last

Ah right, of course, the code will run DispatchQueue.async first (but not its block yet), then DispatchQueue.sync before the async work has started, but the queue is serial, so the sync will block awaiting the async work.

So yes, you're right that the order is guaranteed, sorry.