Event sourcing

Hey everyone!

I'm in the process of cracking event sourcing for DA for my needs (e.g. swift-chat), which then want to decouple and create a separate package out of it.
Here are a bit more information on the topic.

So far I'm trying to replicate Akka's EventSourcedBehavior, currently idea is that there will be a protocol:

public protocol EventSourced: Codable, DistributedActor where ActorSystem == ClusterSystem {
  associatedtype State: Codable & Sendable
  associatedtype Command: Codable & Sendable
  associatedtype Event: Codable & Sendable
  var state: State { get set }
  var persistenceId: PersistenceId { get }
  distributed func handle(command: Command) async throws -> Event
  distributed func handle(event: Event) async throws
}

which then can be applied to actors, e.g.

distributed actor SomeActor: EventSourced {
  
  struct State: Codable, Sendable {
    mutating func change() {}
  }
  enum Event: Codable, Sendable {
    case none
  }
  enum Command: Codable, Sendable {
    case none
  }

  let persistenceId: PersistenceId = "Some"
  var state: State = State()  

  distributed func handle(event: Event) { self.state.change() }
  distributed func handle(command: Command) -> Event { Event.none }
}

(note: will continue on improving that, e.g. currently it's not obvious handle(event:) should be applied to state)

But here where I'm struggling a bit, cause in Akka you just call Effect.persist(event) and it will do trick for you to persist event. And as I don't have much experience in Akka—not even sure how it's implemented under the hood. :thinking: currently slowly reading the documentation and code.

What I have in my mind is that in order to declare EventSourced actors you should register EventJournal plugin (like it's done with ClusterSingletons) like $0.plugins.install(plugin: EventJournal(.postrgres(config))) with some magic behind. And then somehow declare func persist(event:) function for actor...

Another question I'm wondering about—I guess this also could be applied to non-distributed actors, where you can just create a separate journal in iOS/macOS/etc and add actors to it. :thinking:

So if anyone have some suggestions or ideas—would be nice to hear. Also if someone can point to some article or implementation would be also nice to check.

1 Like

@ktoso
Also have a question regarding you reply here

especially with macros and the flexibility you get from actorReady so you can store it and register it for "when events arrive we'll call you"

I'm not sure where macros could be applied here, I guess exactly for func persist(event:) logic :thinking:
Also can you eloborate on actorReady function? There are no documentation and not sure how it can be used...

handling events is never "distributed" so you'd skip the distributed here -- an event always arrives from your journal, which is a local actor driving the replay -- skip the distributed from this bit and use the some.whenLocal { $0.handleEvent(event) } to get hold of the actor to be able to call the event handler. (I'd call it handleEvent rather than handle(event:) as well.

I'm not sure I'd do a handle for commands -- Akka needs to do that since it must stuff all messages into a big enum. We don't have to -- we can use normal methods. Basically any distributed call made to an actor is a command. And "during replay" it is events. So you'd have:

distributed actor Kappa: EventSource { 
  distributed func add(num: Int) { // that's a command
    // emit event
   context.emit(AddEvent(num))
  } 
  func handleEvent(...) {}
}

I think I'd pursue this way and see if we're missing language to allow building this.

You can do A LOT of tricks using the actor system and calling things on the actor etc.

Akka Typed specifically :slight_smile: Akka typed works on "return things to the library, and it handles it".

Akka typed implements a big "receive function" and then calls the handler methods, gets the event; calls persist on it, then invokes the handler if i remember correctly.

Yeah you'd register it like the singleton -- this is in order to have one instance of a journal.

Yeah event sourcing can just work on local actors as well -- making them distributed is a step extra so they can be "recovered" on another node and as a developer you don't even care -- it looks and feels as if it was always alive.

1 Like

actorReady is documented on the system type: https://github.com/apple/swift/blob/main/stdlib/public/Distributed/DistributedActorSystem.swift

We'll be working on more docs for the distributed module though.

Basically, actorReady gets the concrete instance of a ready actor -- so you can check as? EventSourced and if it was, you can install all kinds of handlers and hooks for it.

1 Like

Yeah, true. Tbh when working with distributed actors I just start with same stuff you've done guys in Akka and then improve further. :upside_down_face: But yeah, completely makes sense to eliminate.

Yup. :+1: Actually just realised also makes sense to check classic akka :thinking:

Ah, missed ### Readying Distributed Actors part. :man_facepalming:

Ok, thx for feedback, already some good insights.

1 Like

Cool, good luck!

Yeah it makes sense to take inspiration from Akka, Orleans etc, and see how we can do better. By being part of the language and being really flexible with the actor system we can do some interesting things other runtimes can't really :slight_smile:

If we'll run into missing language features, we can always to to fix those as well :+1:

1 Like

State persistence is also would be interesting to cover at some point. Hides some complexity and works perfectly with virtual actors, I guess.

But guess it's designed mostly for small states.

Thinking out loud maybe one hypothetical approach could be same event sourcing underneath, but you just writing state's diffs and snapshotting from time to time. Not sure if it's possible though. :thinking:

No, you don’t “store diffs”. You always store events which if all are replayed end up at the same end state of the system.

When replays get too long, you store a snapshot and recover from it + replay all events from it. This is usually some everybody (hundred or thousand) events and is an optimization. It is crucial in every sourcing to always store just plain events basically :) everything else is optimizations.

You can choose to delete events before a snapshot if you really need to, but you have the option to do so without another model — it’s just events from a stable state (zero, or a snapshot).

1 Like

But applied to persistent state—you can basically recover to current state from history of diffs?
Anyway, it's just some thoughts while going through documentation and articles :upside_down_face: think for small state make sense just to rewrite it every time.

(though wondering how people choose between persistence state vs. event sourced actor in Orleans for example)

Finally had a time to work a bit and implement basic event sourcing:

  • As I need to hook to actorReady function of a system—implemented event sourcing directly in a fork of swift-distributed-actors
  • Here is an example of an actor which is event sourced Room
    Basically you just need two steps:
/// 1. Add plugin 
$0.plugins.install(
   ClusterJournalPlugin { MemoryEventStore() }
)
$0.plugins.install(ClusterSingletonPlugin()) // you'll also need singleton plugins as journal will wrap store into singleton.

// 2. Just conform some actor to `EventSourced` protocol
distributed actor Room: EventSourced {

   enum Event {
      case message(String)
   }

   distributed func send(message: String, from user: User) async throws {
      let event = Event.message(message)
      try await self.emit(event) // this will save event and update actor's state
      // do other stuff
   }

   func handleEvent(_ event: Event) {
      switch event {
         case .message(let message):
         // update state
      }
   }
}

Though it a small step, but already like how it gives you lots of flexibility with little effort.
As usual feedback is appreciated a lot and thx @ktoso already for support.

4 Likes

Just a small update:

We've created a PR for ClusterSystem to make plugins more flexible—Added hook to lifecycle events by akbashev · Pull Request #1150 · apple/swift-distributed-actors · GitHub
It enables to create a standalone package which can be hooked to actor events in system, which I've done for event sourcing.

So new link is now here: