Beginner advice for Distributed, DistributedCluster modules

Hello :wave:

I am very interested in what advice you can offer someone who is just getting started with the Distributed and DistributedCluster modules. I am having some trouble trying to understand what pieces of these modules I need in order to get two instances of the same program communicating with each other (more details below).

For context, my background when it comes to distributed systems is in Elixir/Erlang ecosystem where there are modules that you just drop in and very easily have a clustered set of computers (for reference, libcluster v3.3.3 — Documentation) sending messages between actors via some TCP or UDP communication layer.

Admittedly my knowledge of distributed systems is quite shallow so please let me know if the maturity of these modules is not ready for "beginners".

To ground this a bit, I am working on Fly.io's Gossip Glomers challenges, specifically the "Efficient Broadcast" challenge (Challenge #3d: Efficient Broadcast, Part I · Fly Docs). The premise of how this system should work is that 25 instances of your program will be started and nodes in this cluster will receive "broadcast" messages, these messages should then be gossiped to the receiving node's neighbors. The tools used to test the correctness of your system offers different topology structures for the nodes (e.g., a tree structure with maximum of 4 children nodes).

Here is my implementation of the Challenge 3d: gossip-glomers/swift/EfficientBroadcast at main · charlieroth/gossip-glomers · GitHub. I am very new to Swift so this probably not the most idiomatic Swift, so be patient if it hurts your eyes :joy:

The file Node.swift is where the action is: gossip-glomers/swift/EfficientBroadcast/Sources/Node.swift at main · charlieroth/gossip-glomers · GitHub

I have seen this challenge be solved without introducing additional complexity like a clustered system but I thought it would be a great opportunity to try out the Distributed and DistributedCluster modules but I am struggling to get started with this. Ideally, I would like to create a cluster, have each node join this cluster when they start up and be able to gossip messages back and forth like the challenge describes.

Are there "drop-in" modules that make clustering, node discovery and communication possible? Or examples that I can learn from since I do want to eventually learn the details of all of this.

I appreciate any advice or help :slightly_smiling_face:

2 Likes

That exactly what DistributedCluster (I guess you're refer to GitHub - apple/swift-distributed-actors: Peer-to-peer cluster implementation for Swift Distributed Actors) should do. To wrap up: Distributed is a number of language features like distributed actors and actor system, with the help of one can build a distributed system. DistributedCluster is, as it says, a peer-to-peer cluster actor system implementation (mostly built by @ktoso). Think closest comparison would be Akka, rather than Erlang/Elixir, but ideas anyway the same with Actors/Processes.

Though would say that even DistributedCluster does have basic stuff, and documentation—it's not there yet like Erlang (which is actually more than 30 years old language!). Some basics examples you can already find in the documentation there. Also, I have an example Swift chat which showcases DistributedCluster.

Btw, that's a good thing maybe to try cover fly challenges :thinking: never thought about that, need to check. :slightly_smiling_face:

2 Likes

Thanks for the reply! I am looking at some of the repositories you have using distributed actors and realizing that I need to probably re-read the docs a couple of times and dive into code of GitHub - apple/swift-distributed-actors: Peer-to-peer cluster implementation for Swift Distributed Actors to get the vocabulary straight in my head

2 Likes

Hi there!
That's a really really cool idea to try to implement using the cluster library.

As @jaleel said, the DistributedCluster is the equivalent of libcluster or of akka-cluster in other languages. It is the basic building blocks, so some higher level abstractions and simplifications are currently not implemented... I don't think we should be hitting anything preventing implementing those challenges though!

I'd be very interested in helping you out and will have a look myself as well. I'm sorry the documentation isn't as complete as one might wish for - perhaps we can use your experience to report back and we'll improve the docs then though :slight_smile:

You can also reach out on the open source slack where we have a swift-distribute channel :slight_smile: Or we can keep the comms to this forum, either works! I'd be happy to help out and will make some time to read up on the challenges myself as well, sounds like great stuff to implement using distributed actors!

I did join the Slack community so I can continue any discussions there :slightly_smiling_face:

To try and explain my confusion for any potential future readers, and @ktoso please correct me if I am wrong at any point...

When building a distributed cluster in Erlang/Elixir you use utilize Distribution Protocol — erts v15.0 in some capacity which handles the node handshakes and connective tissue. This means you generally don't worry about writing the code of connecting nodes or disconnecting nodes, you just write the callbacks to be informed of cluster membership changes. Therefore when setting up a cluster you specify the "names" and ports of the nodes you want to run in the system and the "cluster system" is always available to receive new nodes. It seems like this behavior is a result of the topological structure of Erlang's supervision approach.

In contrast, the DistributedCluster module , you must first create the "root" ClusterSystem and then other nodes must join this "root" ClusterSystem. For example, if I have a CLI application

@main
struct Playground: AsyncParsableCommand {
    @Option var port: Int
    @Option var seedPort: Int?
    @Option var name: String
    
    mutating func run() async throws {
        let system = await ClusterSystem(name) { settings in
            settings.bindHost = "127.0.0.1"
            settings.bindPort = port
        }
        
        if (seedPort != nil) {
            system.cluster.join(host: "127.0.0.1", port: seedPort!)
            try await ensureCluster(system, within: .seconds(10))
        }
        
        let nodeA = await PlaygroundNode(actorSystem: system)
        
        let pingTask = Task {
            let randomSleep = Int.random(in: 500..<1000)
            while true {
                try await nodeA.ping(message: UUID().uuidString)
                try await Task.sleep(for: .milliseconds(randomSleep))
            }
        }
        
        Task {
            try await Task.sleep(for: .seconds(20))
            pingTask.cancel()
            try system.shutdown()
        }
        
        try await system.terminated
    }
}

and I wanted to run two instances of this program, I would have to start the first instance as the "seed" node and then when I start the second instance, I would have to specify the second instance as NOT the "seed" node so that they join properly. If you start two ClusterSystems on the same port and host name you get errors.

Now of course the next step in building something like the code above would be to utilize GitHub - apple/swift-service-discovery: A service discovery API for Swift. and perhaps this would allow me to get something closer to the Erlang/Elixir way?

Overall, I am excited to keep exploring the DistributedCluster library and see what I can build but yes the documentation lacks in the "getting started" subject and maybe that is a good thing for now. I would love to contribute examples to the documentation once I have some more confidence in being able to explain the machinery

Ah I see what you mean... I'll admit I've forgotten about this part of erl on localhost, it's been like a decade since I poked around with erlang in practice :slight_smile:

I can explains what's going on then... and let's use this to improve the docs while at it.

I'll post here and then follow up with documentation improvements.

I think this will be of benefit to everyone interested in distributed systems in Swift. I'd also love to take any contributions you might want to come up with, thank you in advance I look forward to using these challenges as a reason to drive our docs and user experience to the next level :slight_smile:


I can explain a bit what's happening first I guess.

First, there is no "privileged" node in the ClusterSystem, they're all equal. So the "model" how erlang nodes, and swift cluster nodes work is really the same -- it's a bunch of nodes listening on some tcp ports.

I see what you're refering to though and it's an interesting thought but we'd have to write a deamon (like erlang does) or have some external synchronization mechanism... Long story short:

Erlang does the same thing, but thanks to some tricks, you may not have even realized. Erlang starts nodes on a randomized port, and when you use short names e.g. erl -sname a and erl -sname b it binds them to the following ports:

// erlang
-> % epmd -d -names
epmd: up and running on port 4369 with data:
name b at port 53335
name a at port 53282

^ This is perhaps an idea we should totally steal... Even Akka didn't have this, so we could definitely have a leg up and have as-good-as-a good experience as erl here.

Then erlang port mapper by default runs on port 4369 and since there is a single of them, all short name nodes connect to it.

We could do the same, provide a clusterd which binds on a well known port, and then we'd spawn all nodes in "local clusterd discovery" :slight_smile: We have plug-in ability for node discovery: Documentation so we'd just cluster.discovery = .clusterd -- I very much like this idea, here's a ticket for i.

which we can find out by inspecting the erlang port mapper daemon. It's true we don't have that equivalent, so the local "connect the local ones" is up to us. I think this is as simple as having a small "app node" which binds on a "well known port"

That is really the same in the swift cluster -- just that we don't have the "daemon equivalent" so at startup yes you have to write a line or two of "join" commands, but after that it's all just cluster events.

To be clear, this would be the case for erlang as well -- that's just how tcp ports work.

And to repeat this again, there is no strict requirement to be "THE seed node" -- literarily all nodes can try to join all other ones -- even "racing" the joining, and the cluster will form properly.

You can have them start concurrently; there is no ordering requirement. I'd structure this as:

app --port 7337 --name a --seed-port=7337,7338,7339
app --port 7338 --name b --seed-port=7337,7338,7339
app --port 7338 --name c --seed-port=7337,7338,7339
// however many nodes you want here in "seed ports" ^^^

You see I'm being lazy and didn't even filter out the "self" port from the list, because the cluster will know "no need to join myself", and in code I'd do this:

let system = ClusterSystem(name) { ... }
for port in seedPorts { 
  system.cluster.join(Endpoint(host: "127.0.0.1", port: port)
}

// this waits until this node becomes "up" i.e. has "joined"
// since we have "joining" -> "up" node status
try await system.cluster.joined(within: .seconds(5))

^ note also that the try await cluster.joined is a bit simpler than what you have with the ensure... I believe this is again insufficient docs perhaps -- you probably saw this in the distributed philosophers which spawn many nodes from the same process. We should improve docs on this as well, and I made a ticket for it.

:bulb: I think your feedback is actually very valuable... we seem much more difficult and having some special nodes, but that's not the technical reality of it. But erlang's superior UX made it seem like this is much harder than it actually is... I'd definitely be on board with doing the helper daemon process so then we would:

clusterd & // or maybe your "app --clusterd"?
./node --name A
./node --name B

and their join code would become:

ClusterSystem(name) { 
  $0.cluster.discovery = .clusterd
}

This post got pretty long so I'll make another one for the other question.

4 Likes

Ah, this is probably me spoiling the term by using it :sweat_smile: we were chatting yesterday. Also didn't mean seed node like in Akka, more like you can start one node and then connect to it.

Yeah, this would be interesting to check. :thinking:

One note about this bit -- this is a local distributed actor, located on this process/node.

So when you'll want the "other node" to find it you'll write something like documented over here in the Receptionist docs (maybe we should rename this to "discovering actors"):

distributed actor Echo {
  typealias ActorSystem = ClusterSystem

  @ActorID.Metadata(\.receptionID)
  var receptionID: String

  init(actorSystem: ActorSystem) async {
    self.actorSystem = actorSystem
    self.receptionID = "*" // by contention "all"

    await actorSystem.receptionist.checkIn(self)
  }
}

TODO: We should check why I didnt' make it just checkIn automatically when you have receptionID metadata. Now that I look at it... it should just work :thinking: (ticket)

And this is how you actors of given type on any node that has joined the cluster and checked-in (somewhat similar to erlang's name registry for PIDs), but notice that these are well typed (!):

let echoKey = DistributedReception.Key(Echo.self)
for await echo in await system.receptionist.listing(of: .workers) {
  // this lists echo instances connected on all different nodes (!)
  // and keeps the listing loop going forever
  // "listening" for new ones.
}

or you can just do the simple single shot listing call:

      let echos = self.listing(of: Echo.self)

This returns list of all Echo CURRENTLY known in the cluster, so this call can be racy, while the listing sequence is not.


So you're definitely raising some right developer experience questions... I'd love to address them all but will have to see how much time I manage to make for this. I'll see what I can do though!

Please keep asking questions and if you'd like to contribute that'd be cool as well.

2 Likes

I appreciate the responses :slight_smile:. Everything you say makes sense and clears up some of the smaller details for me.

To clarify, there is slightly more code in the example I provided which includes the await system.receptionist.checkIn(self) snippet. I omitted to keep the post shorter. Here is the entire implementation: gossip-glomers/swift/Playground/Sources/Playground.swift at main · charlieroth/gossip-glomers · GitHub

I will keep on learning and either ask questions on Slack or post here (or separate posts) if they are applicable to a wider audience. Again thank you for the feedback and the work you've done with these packages. Im glad that my confusion is being put to good use :sweat_smile:

2 Likes

I understood what you meant by "seed" node. I was just borrowing the phrasing since it made the most sense to describe the relationships for my implementation. I didn't realize it had meaning in context of Akka. Thank you for your help as well!

2 Likes

Thank you again! I think it’s great to have these discussions and we can perhaps quickly fix up a bunch of the issues :slight_smile:

Please keep it coming! On the forums is good place and for random quick stuff slack :)

2 Likes

I really liked the idea and had an hour to spare today so I prototyped this:

Please have a look: ClusterD PoC which serves as seed node by ktoso · Pull Request #1155 · apple/swift-distributed-actors · GitHub


Creating a few nodes in a system would literarily be:

> swift-clusterd & # or as a system-service - which is a binary we'd prepare and basically just does

this basically just does:

await ClusterSystem.startClusterDaemon()

This is the same as epmd, it has to be running somewhere.

EPMD is an external program, implemented in C. Whilst net_kernel:start/1 takes care of creating the net_sup supervisor, it does not actually trigger the EPMD daemon, which has to be started explicitely. I had a look at how EPMD is started when the -sname option is specified in the erl command and - surprise, surprise - I discovered that the epmd program is started via a system() C call.

And applications just do:

    let system = await ClusterSystem(name) { settings in
      settings.discovery = .clusterd
    }

and you're done:

> myapp --name first 
> myapp --name second

they'll join eachother.

We could even make it such that by default we look for some --cluster-discovery=clusterd or an env var etc...

5 Likes

Nice, looks simple enough. Appreciate you taking the time to look into this and spending some time building out a POC. I hope to make use of it if/when it lands in main

1 Like