Aggregate store
In the current state of our system, if we request an account, here is what happen:
- The system read all events related to our account id
- Starting from an empty state, it applies all events to “rebuild” the latest state version
- It returns this latest state
In a real world bank system, we would have several thousands of events.
In such a scenario, applying all events would have a high performance cost.
To fix this issue, one solution is to implement an AggregateStore
.
import java.util.concurrent.CompletableFuture;
public class BankAggregateStore extends DefaultAggregateStore<Account, BankEvent, Tuple0, Tuple0, Connection> {
public BankAggregateStore(EventStore<Connection, BankEvent, Tuple0, Tuple0> eventStore, EventHandler<Account, BankEvent> eventEventHandler, ActorSystem system, TransactionManager<Connection> transactionManager) {
super(eventStore, eventEventHandler, system, transactionManager);
}
public BankAggregateStore(EventStore<Connection, BankEvent, Tuple0, Tuple0> eventStore, EventHandler<Account, BankEvent> eventEventHandler, Materializer materializer, TransactionManager<Connection> transactionManager) {
super(eventStore, eventEventHandler, materializer, transactionManager);
}
@Override
public CompletionStage<Tuple0> storeSnapshot(
Connection connection,
String id,
Option<Account> maybeState) {
return CompletableFuture.supplyAsync(() -> {
maybeState.peek(state -> {
try {
PreparedStatement statement = connection.prepareStatement("""
INSERT INTO ACCOUNTS(ID, BALANCE) VALUES(?, ?)
ON CONFLICT (id) DO UPDATE SET balance = ?
""");
statement.setString(1, id);
statement.setBigDecimal(2, state.balance);
statement.setBigDecimal(3, state.balance);
statement.execute();
} catch (SQLException throwable) {
throw new RuntimeException(throwable);
}
});
return Tuple.empty();
});
}
@Override
public CompletionStage<Option<Account>> getAggregate(Connection connection, String entityId) {
return CompletableFuture.supplyAsync(() -> {
PreparedStatement statement = connection.prepareStatement("SELECT balance FROM ACCOUNTS WHERE id=?");
statement.setString(1, entityId);
ResultSet resultSet = statement.executeQuery();
if (resultSet.next()) {
BigDecimal amount = resultSet.getBigDecimal("balance");
Account account = new Account();
account.id = entityId;
account.balance = amount;
return Option.some(account);
} else {
return Option.none();
}
});
}
}
This aggregate store maintains a parallel Accounts
table in the database that stores the latest state of each account.
To add this aggregateStore in our system, we need to pass it to our PostgresKafkaEventProcessor
.
public class Bank {
//...
public Bank() {
//...
BankAggregateStore bankAggregateStore = new BankAggregateStore(eventStore, eventHandler, actorSystem, transactionManager);
this.eventProcessor = new PostgresKafkaEventProcessor<>(new PostgresKafkaEventProcessor.PostgresKafkaEventProcessorConfig<>(
eventStore,
transactionManager,
bankAggregateStore,
commandHandler,
eventHandler,
List.of(meanWithdrawProjection),
kafkaEventPublisher
));
}
//...
}
IMPORTANT NOTE: This aggregate store should only be used when having performance issues on state reading. It’s a tradeoff where we accept to have slower write to get faster read.
To implement an alternative read model (i.e. CQRS), projections should be used.