Skip to content

Instantly share code, notes, and snippets.

@xrrocha
Last active May 9, 2022 00:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save xrrocha/0b840e1244e3d14ceb18a8a7f9e682d7 to your computer and use it in GitHub Desktop.
Save xrrocha/0b840e1244e3d14ceb18a8a7f9e682d7 to your computer and use it in GitHub Desktop.
interface Command { fun applyTo(system: Any) }
interface Query { fun extractFrom(system: Any): Any?}
interface EventSourcing {
fun append(event: Any)
fun <E> replay(eventConsumer: (E) -> Unit)
}
interface TxManager {
fun begin()
fun <T> remember(who: Any, what: String, value: T, undo: (T) -> Unit)
fun rollback()
companion object: TxManager { ... }
}
class MemoryImageProcessor(private val system: Any,
private val eventSourcing: EventSourcing)
{
init {
synchronized(this) {
// Replay serialized events to restore in-memory state
eventSourcing.replay<Command> { command -> command.applyTo(system) }
}
}
// Apply incoming command to system, single-threaded
fun execute(command: Command): Unit = synchronized(this) {
TxManager.begin()
try {
command.applyTo(system) // Try and apply command
try {
// Serialize; retry internally if needed
eventSourcing.append(command)
} catch (e: Exception) {
// No attempt to rollback; irrecoverable
logger.severe("Error persisting: ${e.message}")
throw e
}
} catch (e: Exception) {
TxManager.rollback() // Undo any partial mutation
// It's (kinda) ok for a command to fail; rethrow
throw CommandApplicationException("Command error", e)
}
}
// Run incoming query on system, multi-threaded
fun execute(query: Query): Any? = query.extractFrom(system)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment