Event Sourcing with scala, play framework and eventuate (Part 2 - Persisting Actor States)

In this section we will complete two tasks

  1. Writing our persistent actors to persist trade events to LevelDB. (C of CQRS)

  2. Reading events from a shared event log and maintain state. (Q of CQRS)

For this we will be using two abstractions provided by eventuate - Event-sourced Actor and Event-sourced View.

Event-sourced Actor

An event-sourced actor is an actor that captures changes to its internal state as a sequence of events. It persists these events to an event log and replays them to recover internal state after a crash or a planned re-start. Its derived state is an in-memory write model, representing the command-side (C) of CQRS.

Trade Actor: Persisting events

Lets start by writing a test case for TradeActor.

We want our trade actor to be able to persist events when it receives a CreateTrade & UpdateTrade commands.

The Spec class skeleton looks like this -

We are using akka-testkit for testing our actor interactions.

The test case creates a unique event log for itself so that other tests running in parallel wont affect this testcase.

We will be doing three things in the test,

The complete test case looks like below -

Simple enough! One thing missing here is that we are not clearing the event log before each test run. I am leaving it out as a TODO but if you want to do it, you can write before method which deletes the leveldb directory.

Now lets take a look at our trade actor implementation.

Our trade actor class will extend from EventSourcedActor. When implementing a EventSourcedActor we need to override two methods onCommand and onEvent. We will maintain out current state in tradeOpt variable.

There are three steps in Trade Actor when creating or updating a trade,

  1. onCommand handler will handle the create/update command and call persist with the respective event.

  2. The onEvent handler is invoked on successful persist. This is where we will update our internal state.

  3. After completion of onEvent the persist method calls the handler where we return success or failure message to the sender.

That’s it. We are able to persist changes and maintain current state of our trade object now.

On restart the onEvent handler is invoked in the same sequence as the events were received. So we will always have the same state even when the application restarts or actor crashes.

Also, event handler and persist handler are called on a dispatcher thread of the actor.

They can therefore safely access internal actor state. In our case the tradeOpt

The sender() reference of the original command sender is also preserved, so that a persist handler can reply to the initial command sender.

Event-sourced View

Event sourced view is an actor that consumes events from its event log but cannot produce new events. Its derived state is an in-memory read model, representing the query-side (Q) of CQRS.

Trade Manager : Query side

Our Trade manager will extend from Event-sourced view.

It will route Create and Update commands to respective trade actors based on trade id. It will also receive TradeCreated and TradeUpdated events asynchronously from the Trade Actor on successful persist to levelDB. On receiving these events our manager will update the internal state so that all queries can be implemented in trade manager.

The spec class for Trade Manager will be very similar to our trade Actor.

Lets go ahead and look at the completed spec.

Again here we follow same principle, send certain commands to our trade manager actor and expect correct responses.

So our Trade Manager actor will have to handle those four commands we mentioned in the spec and also keep in memory the list of trades received in a Map for retrieving individual or all trades.

The trade manager above is complete but it does not receive the onEvent calls from the TradeActor.

Now we need the TradeActor to asynchronously send message to the manager on successful persist.

Eventuate has got that covered for us with event routing.

Event Routing

An event that is emitted by an event-sourced actor or processor can be routed to other event-sourced components if they share an Event log.

Eventuate defines the following routing rules:

In eventuate the routing destinations are defined during emission of an event and are persisted together with the event. This makes routing decisions repeatable during event replay and allows for routing rule changes without affecting past routing decisions.

If you look at the definition of persist method its this -

final def persist[A](event: A,
  customDestinationAggregateIds: Set[String] = Set()
  )(handler: Handler[A]): Unit
      

Along with the event we can pass a set of aggregate ids who should receive the event on a successful persist.

So we will pass managers aggregate Id to the persist method in TradeActor and our trade manager will start receiving the create and update events.

class TradeActor(){ 
 //...Notice the managers aggregate id
case CreateTrade(trade) =>
      persist(TradeCreated(trade), Set(managerAggregateId)) {
        case Success(evt) =>
          sender() ! CreateTradeSuccess(trade)
        case Failure(cause) =>
          sender() ! CreateTradeFailure(cause)
      }

All tests should pass after you have made the above change to the trade actor.

That’s it, with eventuate we have easily implemented the command and the query for our application.

Whats Next

Next we will take a look at more event collaboration with eventuate and implementing websockets with Play.

Reference

http://rbmhtechnology.github.io/eventuate/reference.html