[Pitch] Distributed Actors

Without being qualified to judge the necessity of making this a language feature instead of a library, I’ll add my concurrence with Jordan’s general philosophy here: Swift should, when possible, prefer adding language features to make a thing possible over adding that thing to the language directly.

I thought SwiftUI was an excellent model for this approach: property wrappers and function builders are finding excellent uses well beyond SwiftUI, even though SwiftUI was the star client for them.

If distributed actors do in fact need new features in the language itself, it’s perhaps worth a design pass to consider whether smaller, more composable language features might serve those unmet needs.

13 Likes

+1 for breaking language features down as much as possible. That also reduces the need for future language changes to add new functionality.

Just want to confirm my understanding of the section “Transporting Errors” - it seems it should be possible to implement a transport that will return from the await when e g. a message has been enqueued to the network (but can throw if that fails for some reason) and there are no requirement to await any reply from the remote end (for distributed actor methods that aren’t throwing and returns void).

Specifically this allows for streaming-style calls of distributed actor methods that are one way without requiring round trips. So it’ll be up to the transport on how to handle this afaict (the other end of the spectrum could be a transport that will round trip so we don’t return until we know a message was delivered).

Yeah that's correct. I know other proposal co-authors are a bit nervous about such "does not wait" call, but I think they are tremendously useful, especially in distribution where one really does not want to wait for replies sometimes, and just best effort shoot messages over, getting acks back asynchronously e.g. in batch. It could be done perhaps not with Void but some "DontWaitVoid" return type or something silly... (that the transport understands) :thinking:


I would love to have a real uni-directional call capability in the language and it's popping up here and there already, like in Kavon's initializer work: SE-0327: On Actors and Initialization (under review now), there is an assumption that:

// NOTE: Task.detached is _not_ an exact substitute for this.
// It is expected that Custom Executors will provide a capability
// that implements this function, which atomically enqueues a paused task
// on the target actor before returning.
func spawnAndEnqueueTask<A: AnyActor>(_ a: A, _ f: () -> Void) { ... }

would exist. Such operation is what allows for such construct on a language level... I don't know if we'll ever get such "send" or not, but it has certianly come up a few times and I'd personally advocate for it, it solves a lot of potential "high level" races. This is similar in the sense that it makes a new task, but it is known that we'll never wait for it's result. If we knew we're such call perhaps we could automatically do the "don't reply"... This is WILD SPECULATION though, not something we planned / designed so far.

I think what you ask for would be explicit in the types for now. And I agree there are good use-cases for uni-directional calls (ha, like Akka's good ol' "tell (fire and forget), don't ask (request reply)" motto :wink:).

1 Like

Hi Jordan, thanks for chiming in :slight_smile:

You likely won't be surprised that work towards distributed actors isn't new, and thanks to our open sourcing of the ongoing distributed actor cluster work last week I can even share some additional details here:

We've been working on this for a long time, even before Swift had actors in the language even (feel free to check the repo). One could easily ask "why do we need actors in the language if it could be a library?" The answer is that the high level of language integration and benefits of these concepts in the language by far outweigh the costs; the isolation enabled by actors, automatic hopping etc, are truly some great and exciting wins for ergonomics, understandability, and teachability for Swift IMHO. The same is true for distribution.


Funnily enough: sure, the internal representation of a distributed actor, implemented only as a library, is similar to such enum:

// Refs.swift
public struct _ActorRef<Message: ActorMessage>: @unchecked Sendable, ... {
    /// INTERNAL API: May change without further notice.
    /// The actor ref is "aware" whether it represents a local, remote or otherwise special actor.
    ///
    /// Adj. self-conscious: feeling undue awareness of oneself, one's appearance, or one's actions.
    public enum Personality {
        // TODO(distributed): introduce new 'distributed actor' personality that replaces all other ones
        case cell(_ActorCell<Message>) // "local"
        case remote(_RemoteClusterActorPersonality<Message>) // "remote"
        // ... more 'special' things here
    }

    internal let personality: Personality
}

:slightly_smiling_face:

We also explored such implementation approach in the language, where we synthesize storage as such enum, adn functions switch over it etc. I can assure you it ends up more complex than what we are proposing here, due to how state works in these actors - it would require "rewriting" all functions to perform access to their state through this enum, and it can't really participate in isolation as well as the current proposal (it would end up having a lot of hacks).

To be clear though, such ActorRef<Message> isn't nearly enough to do something useful. In addition to what I mentioned above, we also have entertained and implemented multiple approaches to actor APIs like this, including a rather pure and functional Behavior style approach (similar to Akka Typed), and even a Source Generation based approach (that you are suggesting), but very limited in its ability to support protocols and protocol oriented programming... and after years of trying these different approaches, we are not satisfied with the results, and only the language integration provides the necessary tools for it all to feel right and "just work".

Especially source generation is very limited, there is no way to truly support all things we want to, without reinventing the entire type system in some ad-hoc random "try matching strings" fashion, re-constructing everything from the parsed sources. As a matter of fact, a large contributing use-case for SwiftPM plugins, was at some point the idea that we could get away with source generation -- this turned out to be false. The user experience is not good enough, and there are important limitations that we just can't work around with just source-generation that are deeply rooted in the type and build systems.


To take a step back here though. I understand the general wish to have more a powerful language, that would allow us to express this using other features, however: realistically, this would require an incredibly powerful macro system; not just expanding or wrapping simple functions, but changing allocation of entire types, and affecting their extensions throughout all modules etc. Otherwise we'd end up with unsound or unsafe ways to use such actors. The complexity of that would be far greater than what we are proposing.

I'll also say that we believe that this feature is fairly small in its "complexity added" because of how coherent it is both with the existing actor model in swift: most of the things this adds are expressed in terms of actor isolation (!), and the only new capabilities are the proxying abilities of distributed functions, and the initialization/lifecycle. Thanks to being exposed as one coherent feature, and not 5 features that users have to clunkily slap together when they try to use these types of actors, I believe this is a much simpler feature than the attempt of creating many features that can combine into this.

More features, more moving parts, more complexity. Whilst this has a specific purpose in life, is strictly based on the foundations of the already existing actor model, and the well known concept of distributed actors, and is honestly going to very much outshine a lot of other ones out there.

One last point I'd like to make is that while indeed our reference implementation and focus is the cluster library, please do keep in mind how general purpose this language feature is -- it is very much designed to be open and extensible for other transports (this is fairly unique for such thing by the way). And in a world where distributed computing is basically everywhere, I think this is an amazing step to take for a language so uniquely positioned as Swift :slight_smile:

// edits: rephrasing and a few typos

13 Likes

Awesome, for several use cases in a geographically distributed setup it's a must for some pub/sub || async callback patterns that would be critical for practical use, so no need to be nervous ;-)

With regard to using a specific return type || keyword, in DO, this was accomplished with the oneway keyword, quoting the documentation:

For methods without a return value, the method can be declared with the oneway keyword to indicate that the message should be sent asynchronously. The client does not block in that case and continues running once the message is sent.

It would make sense to have some sort of return type for that so a transport can choose correct semantics (sync or async visavi the remote actor before continuing, so to bikeshed... AsyncVoid - it becomes a bit meta with 'async async' though :slight_smile: ).

The real uni-directional call capability you describe would be a super nice way to do it, but I'm more than happy with return type annotation, thanks.

Haha, right — this was brought to my attention by some Apple people familiar with DO during earlier design iterations… Indeed, that’s pretty much it.

Yeah, I’m mostly thinking about event handlers, or like pub-sub, where the delivery is best effort anyway — and situations in which if the recipient didn’t get the message, I don’t really care.

We’ll see how deep we’ll integrate this concept… but for now I’m suggesting we don’t extend the language even more, and we could use some specific return type that the transport “knows”.

1 Like

All of the public methods exposed by an actor could be considered its location-agnostic API, so it could be interesting to consider a reverse rule for them - instead of requiring all of the accessible (in a use-site sense) methods be annotated as distributed, consider them distributed by default and allow specifying local for the ones that could only be reached specifically on a "local" version of the actor.

If method has to be annotated as public e.g. to allow an actor to satisfy protocol requirement it should also be annotated as local to avoid publishing it as part of a location-agnostic actor interface.

I think this brings semantics closer to nonisolated of non-distributed actors and allows to clearly define that all of the properties of a distributed actor are implicitly local as well as all of its "internal" methods which are not part of an actor interface and cannot be accessed from its instance.

5 Likes

I'd like to share some of my thoughts on this topic which might or might not turn into something useful, but I think it still helpful to post here.

  • I consider actor, transport, and serialization to be three separate concepts/components which together compose a distributed actor system.

  • A transport could be de-coupled with an actor. Sometimes it's useful to make an actor reachable via multiple different transports and/or protocols e.g. all of the actors are reachable at a given IP:port in local-DC but only some are reachable cross-DC or globally.

    I think fits well with example already mentioned in the discussion here where there is one primary or coordinator and N secondary replicas and one primary or coordinator could be reachable to clients.

    Instead of requiring a "transport" to be passed to a distributed actor we could split some functionality from it into a "manager" which would handle actor state (up/down) and deal with identities. So "transport" would only be responsible for serialization and data transfers. The idea here is that manager is a "discovery" mechanism which could operate on its own transport/protocol and connect actors to transports. This is useful because discovery mechanisms are usually a separate concern in practice, and actors not always want to be accessible to other actors:

protocol ActorManager {
  associatedtype ActorType
  
  // Note: "identity" type should be made a property of actor manager...
  
  func register(_: ActorType) 
  
  func identify(_ range: ActorType) -> RangeID
   
  /// Retrieve an actor responsible for the given range id
  func resolve(id: RangeID) throws -> ActorType 
   
  func makeReachable(_: ActorType, via: [ActorTransport]) 
}

Based on this manager interface, let's define a Ring of tokens:

class Ring : ActorManager {
   // Just for presentation purposes to make sure that this actor handles only TokenRange
   typealias ActorType = TokenRange 
   
   // init(...)
   
   // Registers a range of tokens in the system. 
   //
   // The range is local until marked as reachable via calling `markReachable`.
   func register(_: TokenRange) { ... }
   
   // Global unique identifier of the range.
   func identify(_ range: TokenRange) -> RangeID { ... }
   
   // Retrieve an actor responsible for the given range id
   func resolve(id: RangeID) throws -> TokenRange { ... }
   
   // Makes the given actor reachable from other processes/nodes via the given
   // set of protocols.
   func makeReachable(_ range: TokenRange, via: [ActorTransport]) {
      ...
   }
}

Each of the tokens can hold some "data". Tokens are organized into continuous ranges via TokenRange distributed actor:

distributed actor TokenRange {
  ​let ring: Ring
  ​var storage: [Token: Data]

  ​init(ring: Ring, range: (Token, Token)) {
     ​self.storage = ... // <- One can imagine a situation when existing data has to be loaded, which could take some amount of time
     ​
     ​// Make sure that the system knows about this "local" range, similar to `actorReady` but not exactly
     ​defer {
     ​   ring.register(self)
     ​}
  ​}
  ​
  ​[distributed] func read(at loc: Token) [async throws] -> Data? { ... }
  ​[distributed] func write(to loc: Token, contents: Data) [async throws] -> Result<Data, Error> { ... }
}

The system so far could look something like this:

// ~~~~~~~~~ Node 1 ~~~~~~~~~~~
let ring = Ring(<some seeds>)

let range1 = TokenRange(ring, range: (0, 255))
let range2 = TokenRange(ring, range: (255, 500))

// ... Do some data loading or other bootstrap operations in parallel ....

ring.makeReachable(range1, via: [.tcp(host: ..., port: ...), .ipc(...)])
ring.makeReachable(range2, via: [.tcp(host: ..., port: ...), .ipc(...)])


// ~~~~~~~~~ Node 2 ~~~~~~~~~~~

let ring = Ring(<some seeds>)

let range3 = TokenRange(ring, range: (500, 0))

ring.makeReachable(range3, via: [.tcp(host: ..., port: ...), .ipc(...)])

So the Ring knows about all three token ranges, where they are located, and how they could be accessed.

We could also consider a slice command that returns an iterator for the data for a given range e.g.:

extension TokenRange {
  [distributed] func slice(range: (Token, Token)) [async throws] -> AsyncIterator<(Token, Data)> {
     ... 
  }
}

Such operation requires inter-range collaboration to be performed (because specified range could span multiple actors),
so it could take advantage of the fact that multiple ranges could be co-located on the same process/machine and use optimized transport
in our example IPC between range1 and range2.

Now, I'd like to bulk load the data to the ring, for that I'd define another distributed actor called BulkLoader and its manager:

struct LoaderManager : ActorManager {
   typealias ActorType = BulkLoader
   
	...
}

distributed actor BulkLoader {
   init(manager: LoaderManager, ...) {
      ...
   }
   
   [distributed] func load(ring: Ring, data: [Token: Data]) [async throws] -> Result<Bool, Error> {
     ...
   }
}

I could start with one local loader which is simple to implement but could be slow:

// Information about the token range we are about to load data into
let ring = Ring(<some seeds>)
let loaderManager = LoaderManager(<some seeds>) 

let loader = BulkLoader(loaderManager)
loader.load(ring, [
    (0, "a"),
  (257, "b"),
  (765, "c")
])

In this case bulk loader is not reachable to others at all (but still discoverable in the same process), and takes advantage of the fact that connection between actors is duplex,
which makes it possible for remote peer to send data back and maintain already established connection.

Another possibility is to distribute loaders to different machines and feed them data so it could be loaded locally via IPC:

// ~~~~~~~~~ Node 0 ~~~~~~~~~~~

let ring = Ring(<some seeds>)
// Information about all of the bulk loaders that currently operate on local/remote machine(s)
// which might or might not be coalesced with token ranges
let loaderManager = LoaderManager(<some seeds>) 

let partitionedData: [RangeID: [Token: Data]] = ... // partition data based on known ranges

// Could be done in parallel too
for partition in partitionedData {
  let loader = try loaderManager.resolve(partition.key)
  await loader.load(ring, partition.value)
}

The bulk loaders and token ranges don't really know anything about each other and might not be reachable the same way, two connections are not required
to send/receive the data. If it was possible for ring to detect that bulk loader is co-located with a particular token range then it could use .ipc
transport instead of .tcp to transfer the data between them, which is a big advantage.

  • Each of the actors could specify serialization interface explicitly or default to one i.e. Codable via a typealias.

The idea here is that serialization is separate from transport and the same serialization method could be used with a variety of different transports e.g. sending protobufs over tcp/udp/ipc but still want to make it possible to check location-agnostic methods exposed by the actor statically to make sure that everything would be serializable instead of waiting for the system to crash at runtime.

  • Enabling proxies via location-agnostic actor interface instead of inline code generation

This builds on the previous point about decoupling transport from an actor, because if we do that it wouldn't be possible to generate the thunks as described in the pitch.

To enable this compiler could split original actor declaration into:

  • A protocol that specifies location-agnostic interface of a distributed actor (only distributed methods are visible there);
  • A location-agnostic actor interface (this is what gets returned from ActorManager);
  • A local actor (with properties and internal methods as declared by the user);
  • A remote proxy that doesn't expose anything expect "public" distributed entry points.

Let's go back to our example from above, and see how TokenRange could be transformed into this scheme:

The protocol to which local/remote token ranges have to confirm:

protocol _TokenRange : DistributedActor {
  typealias SerializableVia = Codable
  
  ​[distributed] func read(at loc: Token) async throws -> Data?
  ​[distributed] func write(to loc: Token, contents: Data) async throws -> Result<Data, Error>
}

The location-agnostic interface that would be returned from Ring based on some RangeID

// `class` here because we don't want to have to hope to another executor just to then enter `impl` context.
class TokenRange {
   private let impl: some _TokenRange
   
   // Preserves interface initializers
   public init(ring: Ring, range: (Token, Token)) {
     self.init(impl: _LocalTokenRange(ring: ring, range: range))
   }
   
   [hidden] init<T: _TokenRange>(<identity?>, impl: T) {
     self.impl = impl
   }
   
   func read(at loc: Token) async throws -> Data? {
     return try await impl.read(at: loc)
   }
   
   func write(to loc: Token, contents: Data) async throws -> Result<Data, Error> {
     return try await impl.write(to: loc, contents: contents)
   }
}

Local version of the distributed actor TokenRange that has properties and all of the internal methods:

[hidden] actor _LocalTokenRange : _TokenRange {
  // Note that "local" actor doesn't need `SerializableVia`
  
  ​let ring: Ring
  ​var storage: [Token: Data]

  ​init(ring: Ring, range: (Token, Token)) {
  ​  ...
  ​}
  ​
  ​func read(at loc: Token) async throws -> Data? {
  ​  // code from user version of `TokenRange`
  ​}
  ​
  ​func write(to loc: Token, contents: Data) async throws -> Result<Data, Error> {
  ​  // code from user version of `TokenRange`
  ​}
}

And remote version that could use either nonisolated or @unsafe methods to witness methods of _TokenRange to avoid extra hop:

[hidden] actor _RemoteTokenRange : _TokenRange {
  typealias SerializableVia = Codable

  let transport: ActorTransport
  
  init(id: <identifier>, transport: ActorTransport) {
     self.transport = <initialize transport using `SerializableVia` type>
  }
  ​
  ​nonisolated func read(at loc: Token) async throws -> Data? {
  ​  return try await transport.call(\.read, with: [loc], ...)
  ​}
  ​
  ​nonisolated func write(to loc: Token, contents: Data) async throws -> Result<Data, Error> {
  ​  return try await transport.call(\.write, with: [loc, contents], ...)
  ​}
}

Now, Ring could implement resolve in a following manner:

struct Ring : ActorManager {
  func resolve(id: RangeID) throws -> ActorType {
     if let transport = isRemote(id) {
       return ActorType.init([id]?, transport: transport)
     }
     
     return <local instance registered with the system>
  }
}

So when range (500, 0) is requested on a Node 1 from our original example, Ring would return a TokenRange instance backed by a _RemoteTokenRange.

We could apply all these changes to the chat room example mentioned in the pitch. main function would become a lot more straight-forward because only ChatRoom actor needs to be reachable to the Chatter since all communication is multiplexed through it, and logically Chatter actors shouldn't really be reachable to each other or even to the chat room itself (unless they initiate the connection with chat room) e.g.:

class Chat : ActorManager {
   typealias ActorType: ChatRoom
   
   ...
}

class Chatters : ActorManager {
   typealias ActorType: Chatter
   
   ...
}

in main

let chat = Chat(<seeds>)

let room = ChatRoom(topic: "Cute Capybaras")
chat.makeReachable(room, via: .tcp(host: ..., port: ...))

let chatters = Chatters()

let alice = Chatter(manager: chatters)
let bob = try chatters.resolve(<bob id>)
let charlie = Chatter(manager: chatters)

for chatter in [alice, bob, charlie] {
   Task {
     let room = try chatRoom.resolve(room.id) // this would resolve as a local room every time
     try await chatter.join(room: room)
   }
}

1 Like

Small edit to the SerializableVia interaction with a transport in _Remote* version - the type of serialization would have to be passed to the transport per remote call, so initializer becomes becomes something like this:

init<T: ActorTransport>([id], transport: T) {
   self.transport = transport
}

and transport.call(…) gains an argument to pass serialization format to use for that particular call, which aligns with the idea that transport should be serialization agnostic.

1 Like

Thanks a lot Pavel! We discussed these things off-thread for the entire week and there's some meaningful good ideas here, and others that don't quite fit -- we'll be posting proposals zooming in on the implementation details shortly and will take into account the ideas from here :slight_smile:

Short summary:

  • implicit distributed func we tried over this week (and discussed with Pavel and others) -- they won't work
    • I'll post the "distributed actor isolation" proposal today and this will be discussed there.
  • implementation approach with synthesis of protocol, _Remote decl and _Local decl -- I remain worried about this approach... the protocol synthesis and the types mean that we would not really be able to hide them, and users would end up knowing about _Local... etc.
    • We'll dig into this a bit more though for the "distributed actor runtime & serialization" proposal.

It seems the ActorTransport was very terribly named and caused all kinds of confusion -- it shall from here onwards be known as DistributedActorSystem which is the same type, but just more well named. It is indeed like the ActorManager that you have in your writeup, and not "the actual wire transport". Apologies for the naming there, it seems it caused a lot of confusion to a lot of the reviewers :slight_smile:

Very much agreed on keeping the distributed actors agnostic of actual wire transport, and we'll even be able to handle more of the serialization for users. Thank you for your great ideas on this @xedin ~ especially the way we should de-couple the serialization things.

4 Likes

Okey, here's the promised Distributed Actor Isolation proposal.

2 Likes

Hey - speaking of serialization- one of the nice conveniences of GRPC is being able to serialize any GRPC object simply. That’s very useful ! Both for saving things from the request- and unit testing. So good to have it show up in examples and make it simple to do.

Also- related to the “local XPC service” case- doing that implies more than one service - and wanting to do the same registry and lookup locally as with clusters - and for sub modules with low call frequency connections, running them this way has huge robustness benefits, especially with a registry service that auto restarts services that died.

(See coreos map of interdependent services launching each other constantly for related humor.)

Also - until namespaces comes along - dist actors is one way to package larger swift services with lots of types without worrying about making to much public to api- you just build the whole thing as a service and use dist rpc to publish to clients.

Our system does that today with GRPC on platforms that allow services to run in the background(all-iOS?), and it’s nice because our systems look the same running remotely or locally - just a different IP. And internally it’s all swift code so we get simple end to end testing that is mostly representative of all platforms.

Still would like namespaces though - it’s good to have encapsulated stuff boundaries separate from “visible to user” boundaries. - and modularization is good to enable , separate from visibility to “user”

Hey and also - some of the lifecycle stuff belongs in swift actors 2.0, perhaps - with our actor stuff we found we needed:

  • “wait for other actor with name",
  • “stream outputs to other actor”,
  • “register to receive events of type”
    --all being very useful necessary - perhaps dist actors is equivalent to actors in the end, besides more transport options…. And as I said earlier - XPC like or even just codable transport , let’s us rig up isolates and consider turning off atomics in ARC. The industry is moving away from cross chip syncing “all the damned time” anyway ;-).
2 Likes