Excited to see these APIs overhauled to fit better into a Concurrency world.
I have some feedback on the updated APIs.
Protocol hierarchy
The goal that we want to achieve here is that certain Message
s must be received on the MainActor
while others can be received on any isolation domain. To ensure this the proposal creates three protocols:
Message
MainActorMessage: Message
AsyncMessage: Message, Sendable
and two different different shapes of addObserver
methods. One that takes MainActorMessage
s and the other that takes AsyncMessage
s.
I understand the need for MainActorMessage
to be able to constrain the addObserver
API and only allow @MainActor
isolated observer closures. I am wondering if the MainActorMessage
protocol should be constrained to @MainActor
as well. The observation is already constrained to the MainActor
so do we ever expect MainActorMessage
s that are constructed off the MainActor
but must be observed on the MainActor
?
While both post()
methods are called synchronously, only the MainActorMessage
overload delivers synchronously
However, the above sentence implies that MainActorMessages
are delivered synchronously. This can only be the case if they are posted from a MainActor
isolated context. So can we please clarify from which isolation we expect MainActorMessages
to be constructed and posted and if the above statement is then true in all cases.
Edit:
How is it actually possible that we don't constrain the post
method for MainActorMessage
to @MainActor
? In the end, that method has to synchronously invoke the observe closures that are @MainActor
. If the post
method isn't itself constrained to the @MainActor
how are you calling the closures? Are you jumping through a DispatchQueue.main.sync
?
Now onto the AsyncMessage
type. Do we really need this? Specifically do we need the Sendable
requirement on the protocol itself? Wouldn't it be enough if we use the bare Message
protocol and constrain the post
and addObserver
methods to take a some (Message & Sendable)
. If we ever get the tools in the language for dynamic isolation e.g. @isoalted(parameter)
closures, then we could add an optional isolation
property to the Message
protocol. In general, I think less protocols is good and IMO the AsyncMessage
protocol doesn't seem to carry its weight.
Asynchronous message processing
extension NotificationCenter {
public func addObserver<I: MessageIdentifier, M: AsyncMessage>(
of subject: M.Subject,
for identifier: I,
using observer: @escaping @Sendable (M) async -> Void
) -> ObservationToken where I.MessageType == M
}
The various proposed APIs that observe an AsyncMessage
are taking an async
observer closure. This seems problematic to me. On what Task
is this closure executed? Neither the addObserver
method nor the post
methods are async
so the implementation must spawn an unstructured Task
somewhere to run this closure. Implicitly spawned unstructured tasks are very problematic since creating and destroying tasks is not a cheap operation. If this happens on a path that can be triggered by a remote then this can open up a service to denial of service attacks.
IMO these APIs try to accomplish two orthogonal goals
- Observation of
Message
s that are posted on a different isolation then the one which registered the observer
- Asynchronous message processing
The former can be achieved by just marking the observer
closure as @Sendable
and still synchronously invoking the registered observers when post
is called. The latter IMO should use a different API shape that uses the current task to run the observer
closures. Something like this:
extension NotificationCenter {
public func observe<M: Message & Sendable), Failure: Error, Return>(
messageType: M.Type,
isolation: @isolated (any Actor)? = #isolation,
_ body: (some AsyncSequence<M, Never>) async throws(Failure) -> Return
) async throws(Failure) -> Return
}
This would then allow to observe messages like this
notificationCenter.observe(SomeMessage.self) { messages in
for await message in messages {
await process(message)
}
}
This API shape has two benefits. First, it removes the need for spawning an unstructured task to run the async
closures since the caller is now responsible for providing the async context. This still allows the caller to spawn an unstructured task if they want to but also allows them to use structured primitives instead. Secondly, it removes the need for an ObservationToken
since it leverages the scopes that structured concurrency provides.
ObservationToken
These addObserver()
methods return a new ObservationToken
, which can be used with a new removeObserver()
method for faster de-registration of observers:
Can you expand on when an observer is de-registered if removeObserver
is not called?
Should the ObservationToken
be a ~Copyable
type so that it can only be removed once and also just isn't dropped on the floor?