I'm trying to stitch together an implementation of an actor that responds to notification center messages in order to update the actor's mutable state.
let api = await MyAPI(baseUrl: "www.foo.com").notify()
let foo = await api.baseUrl() // "www.foo.com"
NotificationCenter.default
.post(name: .webServerDidChange, object: "www.bar.com")
let bar = await api.baseUrl() // This is still "www.foo.com"
I believe that sequence is not respecting the actor's executor in this example, but am not finding a way to get this working serially.
Any advice is appreciated, thanks
If you wait for 1 second after the post call, does it work as expected? My expectation is that a notification is not delivered serially but at some future time. (I could be totally wrong on this point though)
Notifications are generally delivered synchronously, and will then be buffered up to some undocumented limit in the AsyncSequence returned by notifications. But you're racing the for await newValue in sequence against the await api.baseURL() — nothing in your code will ensure one executes before the other.
(You also have a retain cycle between your actor and the Task that you have no way to break — generally you will need to ensure that unstructured Tasks are managed at a higher level than the work they do)
Say the notification is delivered serially, when for await newValue in sequence gets hit is the following correct:
The runtime could suspend here allowing the let bar = await api.baseUrl() to execute or it could continue to await self.setBaseUrl(to: url) which could also suspend to allow the let bar = await api.baseUrl() to execute?
Or is the issue only that await self.setBaseUrl(to: url) suspends allowing let bar = await api.baseUrl() to execute?
Or are both possible? Is the await on the sequence more of a “busy work/wait” type lock or is it a possible suspension point for the incoming notification?
This one is closer. Every await is a possible suspension point.
In your case, say:
the Task in your actor is blocked in await iterator.next() in the for await loop
your code synchronously delivers the notification to the async sequence, which unblocks the Task, which probably starts immediately, but might not, depending on how much load the CPU is under
but then immediately calls await api.baseURL(), taking the actor's lock
the task does some more synchronous work, then tries to await setBaseURL(), which will block until it can get exclusive access to the actor.
So I'd guess that 99+% of the time, there's no way for the code delivering the notification to receive the new baseURL, but nothing guarantees that, and there could be cases where it does receive the new baseURL — for example, if a third task is using the actor when the notification is delivered, the setBaseURL() call might get in ahead.
This does queue.async under the hood, so this method still won't receive values synchronously. You'd have to remove this and put queue.sync { ... } into sink to actually receive values synchronously.
In my class code the queue is still executing the DispatchWorkItems serially (since I didn't create the DispatchQueue with the .concurrent flag). Combine is indeed using queue.async which schedules the work item asynchronously (ie it does not wait for the WorkItem to complete) which is fine for the purposes of this publisher emitting the notifications.
For example:
let queue = DispatchQueue(label: "SampleQueue", qos: .userInitiated)
print("Scheduling first async...")
queue.async {
Thread.sleep(forTimeInterval: 3)
print("I'm always completed first")
}
print("Scheduling second async...")
queue.async {
Thread.sleep(forTimeInterval: 1)
print("I'm always completed second")
}
print("Scheduling third sync...")
queue.sync {
print("I'm always completed third")
}
print("... waiting to schedule final async...")
queue.async {
print("I'm always completed last")
}
Scheduling first async...
Scheduling second async...
Scheduling third sync...
I'm always completed first
I'm always completed second
I'm always completed third
... waiting to schedule final async...
I'm always completed last
Ah right, of course, the code will run DispatchQueue.async first (but not its block yet), then DispatchQueue.sync before the async work has started, but the queue is serial, so the sync will block awaiting the async work.
So yes, you're right that the order is guaranteed, sorry.