Handling concurrency in Thoth
There is two main ways to handle concurrency in Thoth: * using the sequence num : you can send sequence number in your commands and raise an error in the command handler if the sequence number is lower than the current one * using a locking mechanism : You can configure the aggregate with a lock option
The lock option provided by Thoth
The lock option is base on the “select for update” feature provided by postgresql.
There is 3 options available:
- NO_STRATEGY : this is the default option, no lock is used
- WAIT_ON_LOCK : the concurrent command will wait for the lock to be released before executing
- FAIL_ON_LOCK : the concurrent command will fail if the lock is already taken
Exemple :
this.eventProcessor = ReactivePostgresKafkaEventProcessor
.withPgAsyncPool(pgAsyncPool)
.withTables(tableNames())
.withTransactionManager()
.withEventFormater(BankEventFormat.bankEventFormat.jacksonEventFormat())
.withNoMetaFormater()
.withNoContextFormater()
.withKafkaSettings("bank", producerSettings(settings()))
.withWaitConcurrentReplayStrategy()
.withEventHandler(eventHandler)
// Set the concurrency strategy for the aggregate store :
.withDefaultAggregateStore(ReadConcurrencyStrategy.WAIT_ON_LOCK)
.withCommandHandler(commandHandler)
.withProjections(this.withdrawByMonthProjection)
.build();