I'd like to share some of my thoughts on this topic which might or might not turn into something useful, but I think it still helpful to post here.
-
I consider actor, transport, and serialization to be three separate concepts/components which together compose a distributed actor system.
-
A transport
could be de-coupled with an actor. Sometimes it's useful to make an actor reachable via multiple different transports and/or protocols e.g. all of the actors are reachable at a given IP:port in local-DC but only some are reachable cross-DC or globally.
I think fits well with example already mentioned in the discussion here where there is one primary or coordinator and N secondary replicas and one primary or coordinator could be reachable to clients.
Instead of requiring a "transport" to be passed to a distributed actor we could split some functionality from it into a "manager" which would handle actor state (up/down) and deal with identities. So "transport" would only be responsible for serialization and data transfers. The idea here is that manager is a "discovery" mechanism which could operate on its own transport/protocol and connect actors to transports. This is useful because discovery mechanisms are usually a separate concern in practice, and actors not always want to be accessible to other actors:
protocol ActorManager {
associatedtype ActorType
// Note: "identity" type should be made a property of actor manager...
func register(_: ActorType)
func identify(_ range: ActorType) -> RangeID
/// Retrieve an actor responsible for the given range id
func resolve(id: RangeID) throws -> ActorType
func makeReachable(_: ActorType, via: [ActorTransport])
}
Based on this manager interface, let's define a Ring
of tokens:
class Ring : ActorManager {
// Just for presentation purposes to make sure that this actor handles only TokenRange
typealias ActorType = TokenRange
// init(...)
// Registers a range of tokens in the system.
//
// The range is local until marked as reachable via calling `markReachable`.
func register(_: TokenRange) { ... }
// Global unique identifier of the range.
func identify(_ range: TokenRange) -> RangeID { ... }
// Retrieve an actor responsible for the given range id
func resolve(id: RangeID) throws -> TokenRange { ... }
// Makes the given actor reachable from other processes/nodes via the given
// set of protocols.
func makeReachable(_ range: TokenRange, via: [ActorTransport]) {
...
}
}
Each of the tokens can hold some "data". Tokens are organized into continuous ranges via TokenRange
distributed actor:
distributed actor TokenRange {
let ring: Ring
var storage: [Token: Data]
init(ring: Ring, range: (Token, Token)) {
self.storage = ... // <- One can imagine a situation when existing data has to be loaded, which could take some amount of time
// Make sure that the system knows about this "local" range, similar to `actorReady` but not exactly
defer {
ring.register(self)
}
}
[distributed] func read(at loc: Token) [async throws] -> Data? { ... }
[distributed] func write(to loc: Token, contents: Data) [async throws] -> Result<Data, Error> { ... }
}
The system so far could look something like this:
// ~~~~~~~~~ Node 1 ~~~~~~~~~~~
let ring = Ring(<some seeds>)
let range1 = TokenRange(ring, range: (0, 255))
let range2 = TokenRange(ring, range: (255, 500))
// ... Do some data loading or other bootstrap operations in parallel ....
ring.makeReachable(range1, via: [.tcp(host: ..., port: ...), .ipc(...)])
ring.makeReachable(range2, via: [.tcp(host: ..., port: ...), .ipc(...)])
// ~~~~~~~~~ Node 2 ~~~~~~~~~~~
let ring = Ring(<some seeds>)
let range3 = TokenRange(ring, range: (500, 0))
ring.makeReachable(range3, via: [.tcp(host: ..., port: ...), .ipc(...)])
So the Ring
knows about all three token ranges, where they are located, and how they could be accessed.
We could also consider a slice
command that returns an iterator for the data for a given range e.g.:
extension TokenRange {
[distributed] func slice(range: (Token, Token)) [async throws] -> AsyncIterator<(Token, Data)> {
...
}
}
Such operation requires inter-range collaboration to be performed (because specified range could span multiple actors),
so it could take advantage of the fact that multiple ranges could be co-located on the same process/machine and use optimized transport
in our example IPC between range1
and range2
.
Now, I'd like to bulk load the data to the ring, for that I'd define another distributed actor called BulkLoader
and its manager:
struct LoaderManager : ActorManager {
typealias ActorType = BulkLoader
...
}
distributed actor BulkLoader {
init(manager: LoaderManager, ...) {
...
}
[distributed] func load(ring: Ring, data: [Token: Data]) [async throws] -> Result<Bool, Error> {
...
}
}
I could start with one local loader which is simple to implement but could be slow:
// Information about the token range we are about to load data into
let ring = Ring(<some seeds>)
let loaderManager = LoaderManager(<some seeds>)
let loader = BulkLoader(loaderManager)
loader.load(ring, [
(0, "a"),
(257, "b"),
(765, "c")
])
In this case bulk loader is not reachable to others at all (but still discoverable in the same process), and takes advantage of the fact that connection between actors is duplex,
which makes it possible for remote peer to send data back and maintain already established connection.
Another possibility is to distribute loaders to different machines and feed them data so it could be loaded locally via IPC:
// ~~~~~~~~~ Node 0 ~~~~~~~~~~~
let ring = Ring(<some seeds>)
// Information about all of the bulk loaders that currently operate on local/remote machine(s)
// which might or might not be coalesced with token ranges
let loaderManager = LoaderManager(<some seeds>)
let partitionedData: [RangeID: [Token: Data]] = ... // partition data based on known ranges
// Could be done in parallel too
for partition in partitionedData {
let loader = try loaderManager.resolve(partition.key)
await loader.load(ring, partition.value)
}
The bulk loaders and token ranges don't really know anything about each other and might not be reachable the same way, two connections are not required
to send/receive the data. If it was possible for ring
to detect that bulk loader is co-located with a particular token range then it could use .ipc
transport instead of .tcp
to transfer the data between them, which is a big advantage.
- Each of the actors could specify serialization interface explicitly or default to one i.e.
Codable
via a typealias
.
The idea here is that serialization is separate from transport and the same serialization method could be used with a variety of different transports e.g. sending protobufs over tcp/udp/ipc but still want to make it possible to check location-agnostic methods exposed by the actor statically to make sure that everything would be serializable instead of waiting for the system to crash at runtime.
- Enabling proxies via location-agnostic actor interface instead of inline code generation
This builds on the previous point about decoupling transport from an actor, because if we do that it wouldn't be possible to generate the thunks as described in the pitch.
To enable this compiler could split original actor declaration into:
- A protocol that specifies location-agnostic interface of a distributed actor (only distributed methods are visible there);
- A location-agnostic actor interface (this is what gets returned from
ActorManager
);
- A local actor (with properties and internal methods as declared by the user);
- A remote proxy that doesn't expose anything expect "public" distributed entry points.
Let's go back to our example from above, and see how TokenRange
could be transformed into this scheme:
The protocol to which local/remote token ranges have to confirm:
protocol _TokenRange : DistributedActor {
typealias SerializableVia = Codable
[distributed] func read(at loc: Token) async throws -> Data?
[distributed] func write(to loc: Token, contents: Data) async throws -> Result<Data, Error>
}
The location-agnostic interface that would be returned from Ring
based on some RangeID
// `class` here because we don't want to have to hope to another executor just to then enter `impl` context.
class TokenRange {
private let impl: some _TokenRange
// Preserves interface initializers
public init(ring: Ring, range: (Token, Token)) {
self.init(impl: _LocalTokenRange(ring: ring, range: range))
}
[hidden] init<T: _TokenRange>(<identity?>, impl: T) {
self.impl = impl
}
func read(at loc: Token) async throws -> Data? {
return try await impl.read(at: loc)
}
func write(to loc: Token, contents: Data) async throws -> Result<Data, Error> {
return try await impl.write(to: loc, contents: contents)
}
}
Local version of the distributed actor TokenRange
that has properties and all of the internal methods:
[hidden] actor _LocalTokenRange : _TokenRange {
// Note that "local" actor doesn't need `SerializableVia`
let ring: Ring
var storage: [Token: Data]
init(ring: Ring, range: (Token, Token)) {
...
}
func read(at loc: Token) async throws -> Data? {
// code from user version of `TokenRange`
}
func write(to loc: Token, contents: Data) async throws -> Result<Data, Error> {
// code from user version of `TokenRange`
}
}
And remote version that could use either nonisolated
or @unsafe
methods to witness methods of _TokenRange
to avoid extra hop:
[hidden] actor _RemoteTokenRange : _TokenRange {
typealias SerializableVia = Codable
let transport: ActorTransport
init(id: <identifier>, transport: ActorTransport) {
self.transport = <initialize transport using `SerializableVia` type>
}
nonisolated func read(at loc: Token) async throws -> Data? {
return try await transport.call(\.read, with: [loc], ...)
}
nonisolated func write(to loc: Token, contents: Data) async throws -> Result<Data, Error> {
return try await transport.call(\.write, with: [loc, contents], ...)
}
}
Now, Ring
could implement resolve
in a following manner:
struct Ring : ActorManager {
func resolve(id: RangeID) throws -> ActorType {
if let transport = isRemote(id) {
return ActorType.init([id]?, transport: transport)
}
return <local instance registered with the system>
}
}
So when range (500, 0)
is requested on a Node 1
from our original example, Ring
would return a TokenRange
instance backed by a _RemoteTokenRange
.
We could apply all these changes to the chat room example mentioned in the pitch. main
function would become a lot more straight-forward because only ChatRoom
actor needs to be reachable to the Chatter
since all communication is multiplexed through it, and logically Chatter
actors shouldn't really be reachable to each other or even to the chat room itself (unless they initiate the connection with chat room) e.g.:
class Chat : ActorManager {
typealias ActorType: ChatRoom
...
}
class Chatters : ActorManager {
typealias ActorType: Chatter
...
}
in main
let chat = Chat(<seeds>)
let room = ChatRoom(topic: "Cute Capybaras")
chat.makeReachable(room, via: .tcp(host: ..., port: ...))
let chatters = Chatters()
let alice = Chatter(manager: chatters)
let bob = try chatters.resolve(<bob id>)
let charlie = Chatter(manager: chatters)
for chatter in [alice, bob, charlie] {
Task {
let room = try chatRoom.resolve(room.id) // this would resolve as a local room every time
try await chatter.join(room: room)
}
}