In this section we will complete two tasks
-
Writing our persistent actors to persist trade events to LevelDB. (C of CQRS)
-
Reading events from a shared event log and maintain state. (Q of CQRS)
For this we will be using two abstractions provided by eventuate - Event-sourced Actor and Event-sourced View.
Event-sourced Actor
An event-sourced actor is an actor that captures changes to its internal state as a sequence of events. It persists these events to an event log and replays them to recover internal state after a crash or a planned re-start. Its derived state is an in-memory write model, representing the command-side (C) of CQRS.
Trade Actor: Persisting events
Lets start by writing a test case for TradeActor.
We want our trade actor to be able to persist events when it receives a CreateTrade
& UpdateTrade
commands.
The Spec class skeleton looks like this -
We are using akka-testkit for testing our actor interactions.
The test case creates a unique event log for itself so that other tests running in parallel wont affect this testcase.
We will be doing three things in the test,
- Create a trade actor
- Send command messages
- Assert that we got the correct response
The complete test case looks like below -
Simple enough! One thing missing here is that we are not clearing the event log before each test run. I am leaving it out as a TODO but if you want to do it, you can write before method which deletes the leveldb directory.
Now lets take a look at our trade actor implementation.
Our trade actor class will extend from EventSourcedActor
.
When implementing a EventSourcedActor we need to override two methods onCommand
and onEvent
.
We will maintain out current state in tradeOpt
variable.
There are three steps in Trade Actor when creating or updating a trade,
-
onCommand
handler will handle the create/update command and callpersist
with the respective event. -
The
onEvent
handler is invoked on successful persist. This is where we will update our internal state. -
After completion of
onEvent
the persist method calls the handler where we return success or failure message to the sender.
That’s it. We are able to persist changes and maintain current state of our trade object now.
On restart the onEvent
handler is invoked in the same sequence as the events were received. So we will always have the same state even when the application restarts or actor crashes.
Also, event handler and persist handler are called on a dispatcher thread of the actor.
They can therefore safely access internal actor state. In our case the tradeOpt
The sender()
reference of the original command sender is also preserved, so that a persist handler can reply to the initial command sender.
Event-sourced View
Event sourced view is an actor that consumes events from its event log but cannot produce new events. Its derived state is an in-memory read model, representing the query-side (Q) of CQRS.
Trade Manager : Query side
Our Trade manager will extend from Event-sourced view.
It will route Create and Update commands to respective trade actors based on trade id. It will also receive TradeCreated
and TradeUpdated
events asynchronously from the Trade Actor on successful persist to levelDB. On receiving these events our manager will update the internal state so that all queries can be implemented in trade manager.
The spec class for Trade Manager will be very similar to our trade Actor.
Lets go ahead and look at the completed spec.
Again here we follow same principle, send certain commands to our trade manager actor and expect correct responses.
So our Trade Manager actor will have to handle those four commands we mentioned in the spec and also keep in memory the list of trades received in a Map for retrieving individual or all trades.
The trade manager above is complete but it does not receive the onEvent
calls from the TradeActor
.
Now we need the TradeActor to asynchronously send message to the manager on successful persist.
Eventuate has got that covered for us with event routing.
Event Routing
An event that is emitted by an event-sourced actor or processor can be routed to other event-sourced components if they share an Event log.
Eventuate defines the following routing rules:
-
If an event-sourced component has an undefined aggregateId, all events are routed to it. It may choose to handle only a subset of them though.
-
If an event-sourced component has a defined aggregateId, only events emitted by event-sourced actors or processors with the same aggregateId are routed to it.
In eventuate the routing destinations are defined during emission of an event and are persisted together with the event. This makes routing decisions repeatable during event replay and allows for routing rule changes without affecting past routing decisions.
If you look at the definition of persist
method its this -
final def persist[A](event: A,
customDestinationAggregateIds: Set[String] = Set()
)(handler: Handler[A]): Unit
Along with the event we can pass a set of aggregate ids who should receive the event on a successful persist.
So we will pass managers aggregate Id to the persist method in TradeActor and our trade manager will start receiving the create and update events.
class TradeActor(){
//...Notice the managers aggregate id
case CreateTrade(trade) =>
persist(TradeCreated(trade), Set(managerAggregateId)) {
case Success(evt) =>
sender() ! CreateTradeSuccess(trade)
case Failure(cause) =>
sender() ! CreateTradeFailure(cause)
}
All tests should pass after you have made the above change to the trade actor.
That’s it, with eventuate we have easily implemented the command and the query for our application.
Whats Next
Next we will take a look at more event collaboration with eventuate and implementing websockets with Play.