A Simple Banking Example
This is the second of a four-part series of posts related to ChronosES, its implementation, and its usage.
- Basic usage (Hello, world!)
- In-depth usage (including a case-study of one of our own internal ChronosES applications!)
In this post, we're going to write and explain all of the code we would need for a (very simple) banking application using ChronosES as its backend.
There are two prerequisites that we have to satisfy in order to run an instance of the ChronosES service:
- Deploy a data store so that the service has somewhere to store all of its data
- Deploy the ChronosES indexing requirements to the filesystem
Deploying a data store
The default data store for ChronosES is Redis. We chose Redis for a few reasons:
- Easy to work with
- Provides transactional functionality (Lua scripting + MULTI/EXEC transactions)
- Has built in notification mechanism (Pub/sub)
In a future post, we'll show how the data store can be configured to use any other database of your choosing; for now, we'll stick with the default to keep things simple. By default, ChronosES will attempt to connect to a Redis instance at 127.0.0.1:6379 (the standard Redis default location). Getting a server running at that location is easy:
$ wget http://download.redis.io/releases/redis-3.2.3.tar.gz $ tar xzf redis-3.2.3.tar.gz $ cd redis-3.2.3 $ make $ src/redis-server
$ brew install redis $ brew services start redis
Deploying index requirements
ChronosES has an external indexing system that uses auto-generated SQLite instances. For this functionality to work properly, ChronosES expects a special directory structure to exist in /var/lib/chronos (which also functions as the working directory for the service). You should only have to deploy these requirements once, and doing so should be easy using facilities that come bundled along with the ChronosES pip installation:
$ virtualenv chronovirt $ source chronovirt/bin/activate $ pip install git+https://github.com/belvedere-trading/ChronosES $ chronos_deploy
You may need root privileges to run the chronos_deploy script on some systems (depending on the permissions of /var/lib); this script is purely additive and should not interfere with other functionality on your system.
Starting the service
With our prerequisites satisfied, we're ready to start ChronosES! Assuming we're still sourced into our virtualenv from the previous section, we can start the service simply by running its executable. We should then see some output like this:
$ chronoses 2016-08-18 18:35:18.314220 UTC; Severity=Information; Component=Chronos; Function=Gateway.main; Title="Chronos Hard Start"; ...
Now our service is ready to go!
We'll explore some of the basic ChronosES functionality by creating a simple banking application. We're going to keep things really simple here - the requirements that we want to satisfy are:
- People should be able to create new accounts with some initial deposit
- People with open accounts should be able to withdraw money
- People with open accounts should be able to deposit money
- People should be able to mark their account as closed if it has no balance
- Once closed, an account can no longer be modified
In order to start using ChronosES, we have to define our Aggregate and Events. As we know from the previous post, these definitions come in the form of a Protobuf file for our data model and a Python file for our logic. Let's take a look at what our banking files might contain.
Usually, we want to start by defining our data model. Ultimately, the data is what we're really interested in, so this is the most important part of our application in many ways. Without further ado, here's the Protobuf file for our application.
This file must be named BankAccount.proto. The ChronosES service uses "convention over configuration" when it comes to data model naming; the Aggregate name, package name, and file name are required to all match exactly. One interesting thing to notice in this definition is that we have one Event model defined for each action that we want users to be able to make within our application - this is very common in ChronosES.
It's useful to think of the Aggregate as the current snapshot of our application's state. The Events are smaller pieces of data that cause changes in the state. To withdraw some amount from a BankAccount, for example, we only need to know the amount to withdraw (and not, say, the owner of the account).
While our data model looks great, it can't really do much on its own. We still have to define the logic that wires each Event up to its corresponding action. In order to do this, we supply a simple Python file.
There's a lot going on in that file - let's break it down one section at a time.
We have to import classes from three different modules:
from sqlalchemy import Column, String, UniqueConstraint from Chronos.Core import (Aggregate, Event, ChronosIndex, ValidationError) from BankAccount_pb2 import (BankAccount, CreateEvent, DepositEvent, WithdrawEvent, CloseEvent)
The sqlalchemy classes will be used to build our index, the Chronos.Core classes provide us with required infrastructural classes, and finally BankAccount_pb2 is the generated Protobuf code from BankAccount.proto.
class BankAccountIndex(ChronosIndex): owner = Column(String) constraints = [UniqueConstraint('owner', name='ux')]
Every ChronosES Aggregate requires an index definition in the form of a SQLAlchemy declarative base class. Chronos.Core.ChronosIndex is the abstract base for all Aggregate indices. In this case, we see that the BankAccount Aggregate can be indexed by only a single field "owner", and that the "owner" field should be unique across all Aggregate instances.
Defining good indices is very important for a ChronosES Aggregate. The fields that you include in your index will be the only way for you to query for specific Aggregate instances other than their primary id. We'll see an example of how to use this index later on.
class BankAccount(Aggregate): Proto = BankAccount Index = BankAccountIndex def IsValid(self): return self.owner and self.balance >= 0
This definition tells us three things:
- The Protobuf message (which specifies the available data) for the Aggregate is BankAccount.
- The Index for the Aggregate is the BankAccountIndex that we just defined.
- How to determine whether or not an Aggregate instance is logically valid.
Providing a good definition for IsValid is very important; it will be called on updated Aggregate instances after each Event is applied, but before the update is persisted. This ensures that Aggregate instances are never persisted in a "bad" state. For example, a negative account balance or a missing owner would be no good!
The remainder of our logic definition are implementations of the various Events that we have. We'll use two of the implementations to explain:
class CreateEvent(Event): Aggregate = BankAccount Proto = CreateEvent def RaiseFor(self, aggregate): # This is the method that ChronosES will apply # each time an Event is sent if self.version != 1: raise ValidationError('Account already created') aggregate.owner = self.owner aggregate.balance = self.amount def _ensureCreated(event): if event.version == 1: raise ValidationError('Account has not been created') def _ensureOpen(aggregate): if aggregate.isClosed: raise ValidationError('Account is closed') class DepositEvent(Event): Aggregate = BankAccount Proto = DepositEvent def RaiseFor(self, aggregate): _ensureCreated(self) _ensureOpen(aggregate) if self.amount <= 0: raise ValidationError('Deposit amount <= 0') aggregate.balance += self.amount
When defining an Event, you need to supply three pieces of information:
- The Aggregate that the Event is defined for.
- The Protobuf message that contains the data model for the Event.
- The logic for Event application (the RaiseFor method).
When ChronosES receives Events from clients, it determines which Aggregate the Event should be mapped to, finds the appropriate Aggregate instance for the Event, then enqueues the Event for processing. Once all previous Events have been processed, RaiseFor is called with the current Aggregate instance's state.
With these definitions in place, we have almost everything we need to start using our new system. There's only one thing left to do: write a thin client and start sending Events to ChronosES!
Writing a client and sending Events
The amount of work required to define a ChronosES client depends entirely on how much logic you need. Generally, we suggest keeping business logic decoupled from the ChronosES transport by using callbacks to forward data to pure business logic processing functions. For the sake of keeping things simple, we're going to put almost no logic at all into our client; when we receive notifications of Event completions, we're just going to print the results to stdout. Here's our client definition.
In our client definition, we're just wiring up a couple of callbacks so that our application knows what to do when we receive Event notifications. Assuming we named our client file "BankAccountClient.py" and saved it in the same directory as our data model and logic file, we're now ready to start using ChronosES! Here's an example session of some interactive use of the service:
➜ examples git:(master) CHRONOS_SCRIPT_LOCATION=. python Python 2.7.10 (default, Oct 23 2015, 19:19:21) [GCC 4.2.1 Compatible Apple LLVM 7.0.0 (clang-700.0.59.5)] on darwin Type "help", "copyright", "credits" or "license" for more information. >>> from BankAccountClient import BankAccountClient >>> client = BankAccountClient() >>> response = client.Register() >>> print response.responseCode 0 >>> client.Subscribe() # Don't forget this! >>> client.CreateAccount('Martin Fowler', 100) >>> Success: 1: "Martin Fowler" $100, Open >>> account = client.GetAggregateById(1) >>> account.aggregateId 1L >>> account.aggregate.owner u'Martin Fowler' >>> account.aggregate.balance 100 >>> account.aggregate.isClosed False >>> client.Deposit(1, 50) >>> Success: 1: "Martin Fowler" $150, Open >>> account = client.GetAggregateById(1) >>> account.aggregate.balance 150 >>> martins = client.GetAggregatesByIndex(owner='Martin Fowler') >>> martins [AggregateResponse(aggregateId=1L, version=2L)] >>> client.GetAggregatesByIndex(owner='Elon Musk')  >>> client.Withdraw(1, -10) Failure: Withdrawl amount must be greater than 0 >>> client.Withdraw(5, -10) Failure: Unknown aggregateId 5 for BankAccount.BankAccount >>> client.Withdraw(1, 160) Failure: Aggregate was put into an invalid state >>> client.CloseAccount(4) Failure: Cannot close an account with a balance > 0 >>> client.Withdraw(1, 150) Success: 1: "Martin Fowler" $0, Open >>> client.CloseAccount(1) >>> Success: 1: "Martin Fowler" $0, Closed client.Deposit(1, 10) >>> Failure: Account is closed
With that, you should be all set to start writing your own ChronosES applications!
In the next post, we'll go into more depth about the ChronosES architecture and some more advanced features. Until then, have fun playing with ChronosES!