A framework for building event-sourced, CQRS applications in Kotlin.
Event-sourcing is an architectural paradigm wherein application state is modelled and stored as an immutable sequence of semantic events which are meaningful in your application's domain.
CQRS, Command/Query Responsibility Segregation, describes a pattern in which write (command) actions and read (query) actions are codified in entirely separately classes, models and pathways through your system.
Used in tandem, event-sourcing and CQRS provide a powerful and flexible architectural pattern. In an event-sourced, CQRS system, writes typically happen via an event-centric domain model, also known as "Aggregates", and these changes propagate through to "projections" of those events to be read from by the view side of the application. Events are thus considered the source of truth, while projections are disposable and can be rebuilt by reprocessing the historical events.
Kestrel is a Kotlin Event-Sourcing and CQRS framework that strives for:
- Minimalism - lack of boilerplate
- Expressiveness - expressing domain rules well
- Robustness - help you not make mistakes, primarily through strong typing
Here's an example of how an Aggregate might look in Kestrel:
data class SurveyAggregate(val name: Map<Locale, String>, val accountId: UUID, val deleted: Boolean = false) {
constructor(event: Created): this(event.name, event.accountId)
companion object {
fun create(query: SurveyNamesQuery, command: SurveyCreationCommand): Either<SurveyError, Created> = when (command) {
is CreateSurvey -> when {
command.name.any { (locale, name) -> query.nameExistsFor(command.accountId, name, locale)} -> Left(SurveyNameNotUnique)
else -> Right(Created(command.name, command.accountId, command.createdAt))
}
}
}
fun updated(event: SurveyUpdateEvent): SurveyAggregate = when (event) {
is Renamed -> this.copy(name = name (event.locale to event.name))
is Deleted -> this.copy(deleted = true)
is Restored -> this.copy(deleted = false)
}
fun update(query: SurveyNamesQuery, command: SurveyUpdateCommand): Either<SurveyError, List<SurveyUpdateEvent>> = when (command) {
is Rename -> when {
name.get(command.locale) == command.newName -> Left(AlreadyRenamed)
query.nameExistsFor(accountId, command.newName, command.locale) -> Left(SurveyNameNotUnique)
else -> Right.list(Renamed(command.newName, command.locale, command.renamedAt))
}
is Delete -> when (deleted) {
true -> Left(AlreadyDeleted)
false -> Right.list(Deleted(command.deletedAt))
}
is Restore -> when (deleted) {
true -> Right.list(Restored(command.restoredAt))
false -> Left(NotDeleted)
}
}
}
sealed class SurveyCommand : Command
sealed class SurveyCreationCommand : SurveyCommand(), CreationCommand
data class CreateSurvey(override val aggregateId: UUID, val surveyCaptureLayoutAggregateId: UUID, val name: Map<Locale, String>, val accountId: UUID, val createdAt: DateTime) : SurveyCreationCommand()
sealed class SurveyUpdateCommand : SurveyCommand(), UpdateCommand
data class Rename(override val aggregateId: UUID, val newName: String, val locale: Locale, val renamedAt: DateTime) : SurveyUpdateCommand()
data class Delete(override val aggregateId: UUID, val deletedAt: DateTime) : SurveyUpdateCommand()
data class Restore(override val aggregateId: UUID, val restoredAt: DateTime) : SurveyUpdateCommand()
sealed class SurveyEvent : DomainEvent
data class Created(val name: Map<Locale, String>, val accountId: UUID, val createdAt: DateTime) : SurveyEvent(), CreationEvent
sealed class SurveyUpdateEvent : SurveyEvent(), UpdateEvent
data class Renamed(val name: String, val locale: Locale, val namedAt: DateTime) : SurveyUpdateEvent()
data class Deleted(val deletedAt: DateTime) : SurveyUpdateEvent()
data class Restored(val restoredAt: DateTime) : SurveyUpdateEvent()
sealed class SurveyError : DomainError
object SurveyNameNotUnique : SurveyError()
object AlreadyRenamed : SurveyError(), AlreadyActionedCommandError
object AlreadyDeleted : SurveyError(), AlreadyActionedCommandError
object NotDeleted : SurveyError(), AlreadyActionedCommandError
enum class Locale {
en, de
}
- Aggregate - The domain entity that commands interact with and to which events happen. All events happen to a "thing" and this is that thing, a context in which to group events. A system will often have multiple aggregates.
- Command - A request to change the system via an event on an aggregate. May be accepted or denied based on business rules.
- Event -
A "semantic" domain event that has happened. Events can't be undone once they have happened, and can't be
blocked like commands. Events exist in an immutable event stream and once they exist need to be supported forever. At an
implementation detail an
Event
is a wrapper around aDomainEvent
with additional metadata attached. - CommandGateway -
The interface through which commands make their way through to aggregates. It's responsible for routing commands
to aggregates, and orchestrates the loading and saving of aggregates through events and the
EventStore
. - EventStore -
Implements two interfaces, an
EventSink
, for saving events for aggregates, and anEventSource
, for retrieving those events. In general, the event store should only ever be written to via theCommandGateway
and read from via anEventProcessor
. Kestrel provides support for a postgres backed event store out of the box. - EventProcessor -
Provides an abstraction over any event-processor, for example a
Projector
or aReactor
. This takes care of filtering out any irrelevant events from being passed to said projectors or reactors. - Projector - An event processor that merely updates a "projection" of the data. Should always be disposable and re-runnable from the beginning of the event-sequence. Should be built in an idempotent fashion since event-sourced systems favour asynchronous, distributed systems where it becomes more and more impossible to create perfect transactions. Build these as if they are an at-least-once delivery of events.
- Reactor - Like a projector but has side effects, for example sending
Commands
toAggregates
via theCommandGateway
, or sending emails, etc. Best efforts should also be made to make these idempotent and re-runnable from the beginning of the event-sequence, although in practice this tends to be difficult. - AsyncEventProcessor -
Wraps an
EventProcessor
with logic to read events from anEventSource
, dispatch events to theEventProcessor
, and update a "bookmark" representing the sequence number of the last processed event in aBookmarkStore
. - BookmarkStore -
Stores the last processed sequence number as a bookmark for a given
EventProcessor
. - AsyncEventProcessorMonitor -
Provides a mechanism to establish how far
EventProcessor
bookmarks/processing is lagging behind the head of the event stream.
See the sample app for an example of creating a web-app built on top of this framework.
Gradle:
dependencies {
implementation "com.cultureamp:kestrel:{kestrel_version}"
}
KES offers multiple ways of defining your aggregates depending on your needs.
The simplest way to get started is to use the SimpleAggregate[Constructor]
interface. If you are not sure which aggregate creation method to use, we recommend this option.
For example:
data class SimpleThingAggregate(val tweaks: List<String> = emptyList(), val boops: List<Booped> = emptyList()) :
SimpleAggregate<SimpleThingUpdateCommand, SimpleThingUpdateEvent> {
companion object :
SimpleAggregateConstructor<SimpleThingCreationCommand, SimpleThingCreationEvent, SimpleThingUpdateCommand, SimpleThingUpdateEvent> {
override fun created(event: SimpleThingCreationEvent) = when(event) {
is SimpleThingCreated -> SimpleThingAggregate()
}
override fun create(command: SimpleThingCreationCommand) = when(command){
is CreateSimpleThing -> Right(SimpleThingCreated)
}
}
override fun updated(event: SimpleThingUpdateEvent) = when(event){
is Twerked -> this.copy(tweaks = tweaks event.tweak)
is Booped -> this.copy(boops = boops event)
}
override fun update(command: SimpleThingUpdateCommand) = when(command) {
is Twerk -> Right.list(Twerked(command.tweak))
is Boop -> Right.list(Booped)
is Bang -> Left(Banged)
}
}
This can then by wired into your CommandGateway
like so:
val commandGateway = CommandGateway(eventStore, Route.from(SimpleThingAggregate))
If you need access to dependencies during command-handling, or want finer grained control over returned error types or "self" types, there are a few different interfaces you can use:
SimpleAggregate[Constructor]WithProjection
as above but with access to a dependency during command-handling.Aggregate[Constructor]
explicit error and self types.Aggregate[Constructor]WithProjection
as above but with access to a dependency during command-handling.
If you prefer, you can also model your aggregates in a more functional-programming style using a group of related functions. This is useful for when you want more control over how you write your aggregates, for example to utilize the constructor of your class (not possible via interfaces), to only inject a dependency into one of the two command-handling methods, when you don't want to clutter your domain code with loads of generics but still want rich types, or if you simply just prefer thinking in functions.
Here's an example:
data class SurveyAggregate(val name: Map<Locale, String>, val accountId: UUID, val deleted: Boolean = false) {
constructor(event: Created): this(event.name, event.accountId)
companion object {
fun create(query: SurveyNamesQuery, command: SurveyCreationCommand): Either<SurveyError, Created> = when (command) {
is CreateSurvey -> when {
command.name.any { (locale, name) -> query.nameExistsFor(command.accountId, name, locale)} -> Left(SurveyNameNotUnique)
else -> Right(Created(command.name, command.accountId, command.createdAt))
}
}
}
fun updated(event: SurveyUpdateEvent): SurveyAggregate = when (event) {
is Renamed -> this.copy(name = name (event.locale to event.name))
is Deleted -> this.copy(deleted = true)
is Restored -> this.copy(deleted = false)
}
fun update(query: SurveyNamesQuery, command: SurveyUpdateCommand): Either<SurveyError, List<SurveyUpdateEvent>> = when (command) {
is Rename -> when {
name.get(command.locale) == command.newName -> Left(AlreadyRenamed)
query.nameExistsFor(accountId, command.newName, command.locale) -> Left(SurveyNameNotUnique)
else -> Right.list(Renamed(command.newName, command.locale, command.renamedAt))
}
is Delete -> when (deleted) {
true -> Left(AlreadyDeleted)
false -> Right.list(Deleted(command.deletedAt))
}
is Restore -> when (deleted) {
true -> Right.list(Restored(command.restoredAt))
false -> Left(NotDeleted)
}
}
}
This can then by wired into your CommandGateway
like so:
val commandGateway = CommandGateway(
eventStore,
Route.from(
SurveyAggregate.Companion::create.partial(SurveyNameAlwaysAvailable),
SurveyAggregate::update.partial2(SurveyNameAlwaysAvailable),
::SurveyAggregate,
SurveyAggregate::updated
)
)
If you happen to have a "stateless" aggregate that doesn't need to update its internal state to handle commands, you can model that too:
object PaymentSagaAggregate {
fun create(command: StartPaymentSaga): Either<DomainError, PaymentSagaStarted> = with(command) {
Right(PaymentSagaStarted(fromUserId, toUserBankDetails, dollarAmount, DateTime()))
}
fun update(command: PaymentSagaUpdateCommand): Either<DomainError, List<PaymentSagaUpdateEvent>> = when (command) {
is StartThirdPartyPayment -> Right.list(StartedThirdPartyPayment(command.startedAt))
is RegisterThirdPartySuccess -> Right.list(FinishedThirdPartyPayment(DateTime()))
is RegisterThirdPartyFailure -> Right.list(FailedThirdPartyPayment(DateTime()))
is StartThirdPartyEmailNotification -> Right.list(StartedThirdPartyEmailNotification(command.message, command.startedAt))
}
}
val gateway = CommandGateway(
eventStore,
Route.fromStateless(
PaymentSagaAggregate::create,
PaymentSagaAggregate::update,
PaymentSagaAggregate
)
)
Kestrel offers multiple ways of defining your event-processors depending on your needs.
The simplest way to get started is to use the DomainEventProcessor
interface. If you don't think you'll need access to the event metadata, we recommend this option.
For example:
class SurveyNamesCommandProjector(private val database: Database): DomainEventProcessor<SurveyEvent> {
override fun process(event: SurveyEvent, aggregateId: UUID): Unit = transaction(database) {
when (event) {
is Created -> event.name.forEach { locale, name ->
SurveyNames.insert {
it[surveyId] = aggregateId
it[accountId] = event.accountId
it[SurveyNames.locale] = locale
it[SurveyNames.name] = name
}
}
is Renamed ->
SurveyNames.update({ SurveyNames.surveyId eq aggregateId }) {
it[locale] = event.locale
it[name] = event.name
}
is Deleted ->
SurveyNames.deleteWhere { SurveyNames.surveyId eq aggregateId }
is Restored -> Unit
}
}
init {
transaction(database) {
SchemaUtils.create(SurveyNames)
}
}
}
object SurveyNames : Table() {
val surveyId = uuid("survey_id")
val accountId = uuid("account_id")
val locale = enumerationByName("locale", 10, Locale::class)
val name = text("name").index()
}
This can then by wired into your application like so:
val projector = SurveyNamesCommandProjector(database)
val bookmarkName = "SurveyNames"
val eventProcessor = EventProcessor.from(projector)
If you want to process aynchronously you can do something like:
val asyncEventProcessor = BatchedAsyncEventProcessor(eventStore, bookmarkStore, bookmarkName, eventProcessor)
thread(start = true, isDaemon = false, name = asyncEventProcessor.bookmarkName) {
ExponentialBackoff(
onFailure = { e, _ -> println(e) }
).run {
asyncEventProcessor.processOneBatch()
}
}
Or, if you must, you can run it synchronously like:
val eventStore = RelationalDatabaseEventStore.create(listOf(eventProcessor), database)
If you need access to the event metadata during handling, you can use the slightly more verbose interface
DomainEventProcessor
If you prefer, you can also write your event-processor in an interface agnostic way. This is useful for when you want more control over how you write your event-processors, for example writing a single class that handles two or more unrelated domain event types (not possible via interfaces), or if you just don't like interfaces. For example:
class AnimalProjector(val database: Database) {
fun first(event: CatAggregateEvent) = transaction(database) {
when (event) {
is CatNamed -> {
AnimalNames.insert {
it[name] = event.name
it[type] = "cat"
}
}
is CatFed -> Unit
}
}
fun second(event: DogAggregateEvent) = transaction(database) {
when (event) {
is DogNamed -> {
AnimalNames.insert {
it[name] = event.name
it[type] = "dog"
}
}
is DogBarked -> Unit
}
}
}
This can then by wired into your application like so:
val animalProjector = AnimalProjector(database)
val eventProcessor = EventProcessor.compose(
EventProcessor.from(animalProjector::first),
EventProcessor.from(animalProjector::second)
)
val bookmarkName = "AnimalNames"
val asyncEventProcessor = BatchedAsyncEventProcessor(eventStore, bookmarkStore, bookmarkName, eventProcessor)
Using EventProcessor#compose
allows one to wrap up the two event-handling methods as one EventProcessor
which then
allows the sharing of a single bookmark.
When running AsyncEventProcessors
, it becomes important to be able to monitor where each of these are up to in the
event stream. You can do this using the AsyncEventProcessorMonitor
val asyncEventProcessors: List<AsyncEventProcessor> = ...
thread(start = true, isDaemon = false, name = "eventProcessorMonitor") {
val eventProcessorMonitor = AsyncEventProcessorMonitor(asynchronousEventProcessors) {
println("msg='Lag calculation for event-processor' name='${it.name}' lag=${it.lag} bookmarkSequence=${it.bookmarkSequence} lastSequence=${it.lastSequence}")
}
ExponentialBackoff(
idleTimeMs = 60_000,
failureBackoffMs = { 60_000 },
onFailure = { throwable, _ -> println(throwable) }
).run {
eventProcessorMonitor.run()
Action.Wait
}
}