Excited to see these APIs overhauled to fit better into a Concurrency world.
I have some feedback on the updated APIs.
Protocol hierarchy
The goal that we want to achieve here is that certain Messages must be received on the MainActor while others can be received on any isolation domain. To ensure this the proposal creates three protocols:
Message
MainActorMessage: Message
AsyncMessage: Message, Sendable
and two different different shapes of addObserver methods. One that takes MainActorMessages and the other that takes AsyncMessages.
I understand the need for MainActorMessage to be able to constrain the addObserver API and only allow @MainActor isolated observer closures. I am wondering if the MainActorMessage protocol should be constrained to @MainActor as well. The observation is already constrained to the MainActor so do we ever expect MainActorMessages that are constructed off the MainActor but must be observed on the MainActor?
While both post() methods are called synchronously, only the MainActorMessage overload delivers synchronously
However, the above sentence implies that MainActorMessages are delivered synchronously. This can only be the case if they are posted from a MainActor isolated context. So can we please clarify from which isolation we expect MainActorMessages to be constructed and posted and if the above statement is then true in all cases.
Edit:
How is it actually possible that we don't constrain the post method for MainActorMessage to @MainActor? In the end, that method has to synchronously invoke the observe closures that are @MainActor. If the post method isn't itself constrained to the @MainActor how are you calling the closures? Are you jumping through a DispatchQueue.main.sync?
Now onto the AsyncMessage type. Do we really need this? Specifically do we need the Sendable requirement on the protocol itself? Wouldn't it be enough if we use the bare Message protocol and constrain the post and addObserver methods to take a some (Message & Sendable). If we ever get the tools in the language for dynamic isolation e.g. @isoalted(parameter) closures, then we could add an optional isolation property to the Message protocol. In general, I think less protocols is good and IMO the AsyncMessage protocol doesn't seem to carry its weight.
Asynchronous message processing
extension NotificationCenter {
public func addObserver<I: MessageIdentifier, M: AsyncMessage>(
of subject: M.Subject,
for identifier: I,
using observer: @escaping @Sendable (M) async -> Void
) -> ObservationToken where I.MessageType == M
}
The various proposed APIs that observe an AsyncMessage are taking an async observer closure. This seems problematic to me. On what Task is this closure executed? Neither the addObserver method nor the post methods are async so the implementation must spawn an unstructured Task somewhere to run this closure. Implicitly spawned unstructured tasks are very problematic since creating and destroying tasks is not a cheap operation. If this happens on a path that can be triggered by a remote then this can open up a service to denial of service attacks.
IMO these APIs try to accomplish two orthogonal goals
- Observation of
Messages that are posted on a different isolation then the one which registered the observer
- Asynchronous message processing
The former can be achieved by just marking the observer closure as @Sendable and still synchronously invoking the registered observers when post is called. The latter IMO should use a different API shape that uses the current task to run the observer closures. Something like this:
extension NotificationCenter {
public func observe<M: Message & Sendable), Failure: Error, Return>(
messageType: M.Type,
isolation: @isolated (any Actor)? = #isolation,
_ body: (some AsyncSequence<M, Never>) async throws(Failure) -> Return
) async throws(Failure) -> Return
}
This would then allow to observe messages like this
notificationCenter.observe(SomeMessage.self) { messages in
for await message in messages {
await process(message)
}
}
This API shape has two benefits. First, it removes the need for spawning an unstructured task to run the async closures since the caller is now responsible for providing the async context. This still allows the caller to spawn an unstructured task if they want to but also allows them to use structured primitives instead. Secondly, it removes the need for an ObservationToken since it leverages the scopes that structured concurrency provides.
ObservationToken
These addObserver() methods return a new ObservationToken , which can be used with a new removeObserver() method for faster de-registration of observers:
Can you expand on when an observer is de-registered if removeObserver is not called?
Should the ObservationToken be a ~Copyable type so that it can only be removed once and also just isn't dropped on the floor?