Skip to content

Instantly share code, notes, and snippets.

@dragisak
Last active August 29, 2015 14:14
Show Gist options
  • Save dragisak/05329c03c4bb67d547b3 to your computer and use it in GitHub Desktop.
Save dragisak/05329c03c4bb67d547b3 to your computer and use it in GitHub Desktop.
PostgreSQL journal for Akka Persistence in Play! app
# Create tables for event sourcing.
# --- !Ups
CREATE TABLE events
(
event_id uuid NOT NULL,
seq bigint NOT NULL,
event_type character varying(12) NOT NULL,
user_id bigint NOT NULL,
org_id bigint NOT NULL,
payload json,
deleted boolean NOT NULL DEFAULT false,
created_at timestamp NOT NULL,
CONSTRAINT events_pk PRIMARY KEY (event_id, seq)
)
WITH (
OIDS=FALSE
);
COMMENT ON COLUMN events.seq IS 'Event log sequence number. Unique and increasing for event_id.';
COMMENT ON COLUMN events.event_type IS 'Defines what type of event this row is';
COMMENT ON COLUMN events.user_id IS 'ID of the user performing the operation';
COMMENT ON COLUMN events.org_id IS 'ID of the org this event belongs to';
COMMENT ON COLUMN events.payload IS 'Variable part of event sourcing event. Optional.';
COMMENT ON COLUMN events.deleted IS 'Flag that allows us mark events as deleted. Akka persistence will ignore deleted events.';
# --- !Downs
DROP TABLE events;
# Create tables for event sourcing.
# --- !Ups
CREATE TABLE akka_system_events
(
persistence_id character varying(255) NOT NULL,
seq bigint NOT NULL,
payload bytea NOT NULL,
deleted boolean NOT NULL DEFAULT false,
created_at timestamp NOT NULL,
CONSTRAINT system_events_pk PRIMARY KEY (persistence_id, seq)
)
WITH (
OIDS=FALSE
);
# --- !Downs
DROP TABLE akka_system_events;
package persistence
import java.sql.{PreparedStatement, ResultSet}
import java.util.UUID
import util.Time
import actor.EventActor
import akka.persistence._
import akka.persistence.journal.SyncWriteJournal
import akka.serialization.SerializationExtension
import domain.events.{EventDraftCreated, Evt, TitleUpdated}
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
import org.postgresql.util.PGobject
import play.api.Logger
import play.api.Play.current
import play.api.db._
import play.api.libs.concurrent.Akka
import play.api.libs.concurrent.Execution.Implicits.defaultContext
import play.api.libs.json._
import scala.collection.immutable.Seq
import scala.concurrent.Future
import scala.language.postfixOps
import scala.util._
class PostgresJournal extends SyncWriteJournal {
private object EventType {
final val EventDraftCreated = "CREATE"
final val TitleUpdated = "TITLE"
}
private lazy val serialization = SerializationExtension(Akka.system)
/**
* Akka system messages are private classes. All we can do is serialize them as byte arrays.
*/
private lazy val defaultSerializer = serialization.findSerializerFor(classOf[AnyRef])
private val dateFormatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS")
private def timestampObject(ts: DateTime): PGobject = {
val timestampObject = new PGobject()
timestampObject.setType("timestamp")
timestampObject.setValue(ts.toString(dateFormatter))
timestampObject
}
override def writeMessages(messages: Seq[PersistentRepr]): Unit = if (messages.nonEmpty) {
DB.withTransaction { conn =>
val pstmt = if (EventActor.isEventStateMessage(messages.head.persistenceId)) {
conn.prepareStatement(insertEventSql)
} else {
conn.prepareStatement(insertSystemEventSql)
}
messages.foreach { msg =>
val p :PartialFunction[Any, Unit] = bindEventManagerEvt(msg, pstmt) orElse bindSystemEvent(msg, pstmt)
p(msg.payload)
pstmt.addBatch()
}
pstmt.executeBatch()
}
}
private val insertEventSql = "INSERT INTO events(event_id, seq, event_type, org_id, payload, user_id, created_at) VALUES (?,?,?,?,?,?,?)"
/**
* This is where you would put code to persist Event Manager event massages to `events` table.
*/
private def bindEventManagerEvt(msg: PersistentRepr, pstmt: PreparedStatement) :PartialFunction[Any, Unit] = {
case evt: TitleUpdated =>
pstmt.setObject(1, evt.eventId)
pstmt.setLong(2, msg.sequenceNr)
pstmt.setString(3, EventType.TitleUpdated)
pstmt.setLong(4, evt.orgId)
val jsonObject = new PGobject()
jsonObject.setType("json")
jsonObject.setValue( s"""{"title":"${evt.title}"}""")
pstmt.setObject(5, jsonObject)
pstmt.setLong(6, evt.userId)
pstmt.setObject(7, timestampObject(evt.createdAt))
case evt: EventDraftCreated =>
pstmt.setObject(1, evt.eventId)
pstmt.setLong(2, msg.sequenceNr)
pstmt.setString(3, EventType.EventDraftCreated)
pstmt.setLong(4, evt.orgId)
pstmt.setObject(5, null)
pstmt.setLong(6, evt.userId)
pstmt.setObject(7, timestampObject(evt.createdAt))
}
private val insertSystemEventSql = "INSERT INTO akka_system_events(persistence_id, seq, payload, created_at) VALUES (?,?,?,?)"
/**
* Akka sharding messages are serialized as byte array.
*/
private def bindSystemEvent(msg: PersistentRepr, pstmt: PreparedStatement) :PartialFunction[Any, Unit] = {
case systemMessage: AnyRef =>
val bytes = defaultSerializer.toBinary(systemMessage)
pstmt.setString(1, msg.persistenceId)
pstmt.setLong(2, msg.sequenceNr)
pstmt.setBytes(3, bytes)
pstmt.setObject(4, timestampObject(Time.now))
case x => Logger.warn(s"Unhandled payload $x will not be persisted")
}
private val deleteEventsSql = "DELETE FROM events WHERE event_id = ? AND seq <= ?"
private val markEventsAsDeletedSql = "UPDATE events SET deleted = true WHERE event_id = ? AND seq <= ?"
private val deleteSystemEventsSql = "DELETE FROM akka_system_events WHERE persistence_id = ? AND seq <= ?"
private val markSystemEventsAsDeletedSql = "UPDATE akka_system_events SET deleted = true WHERE persistence_id = ? AND seq <= ?"
override def deleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Unit = {
DB.withTransaction { conn =>
if (EventActor.isEventStateMessage(persistenceId)) {
val eventId = EventActor.extractId(persistenceId)
val sql = if (permanent) deleteEventsSql else markEventsAsDeletedSql
val pstmt = conn.prepareStatement(sql)
pstmt.setObject(1, eventId)
pstmt.setLong(2, toSequenceNr)
pstmt.executeUpdate()
} else {
val sql = if (permanent) deleteSystemEventsSql else markSystemEventsAsDeletedSql
val pstmt = conn.prepareStatement(sql)
pstmt.setString(1, persistenceId)
pstmt.setLong(2, toSequenceNr)
pstmt.executeUpdate()
}
}
}
@deprecated("deleteMessages will be removed.", since = "2.3.4")
override def deleteMessages(messageIds: Seq[PersistentId], permanent: Boolean): Unit = ()
@deprecated("writeConfirmations will be removed, since Channels will be removed.", since = "2.3.4")
override def writeConfirmations(confirmations: Seq[PersistentConfirmation]): Unit = ()
private val selectMaxSeqSql = "SELECT max(seq) FROM events WHERE event_id = ? AND seq >= ?"
private val selectSystemMaxSeqSql = "SELECT max(seq) FROM akka_system_events WHERE persistence_id = ? AND seq >= ?"
override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = Future {
DB.withConnection { conn =>
val pstmt = if (EventActor.isEventStateMessage(persistenceId)) {
val eventId = EventActor.extractId(persistenceId)
val stmt = conn.prepareStatement(selectMaxSeqSql)
stmt.setObject(1, eventId)
stmt
} else {
val stmt = conn.prepareStatement(selectSystemMaxSeqSql)
stmt.setString(1, persistenceId)
stmt
}
pstmt.setLong(2, fromSequenceNr)
val resultSet = pstmt.executeQuery()
if (resultSet.next()) {
val maxSeq = resultSet.getLong(1)
if(resultSet.wasNull()) {
fromSequenceNr
} else {
maxSeq
}
} else {
fromSequenceNr
}
}
}
private val selectEventSql = "SELECT event_id, seq, event_type, org_id, payload, user_id, created_at, deleted FROM events WHERE event_id = ? AND seq >= ? AND seq <= ? ORDER BY seq LIMIT ?"
private val selectSystemEventSql = "SELECT persistence_id, seq, payload, created_at, deleted FROM akka_system_events WHERE persistence_id = ? AND seq >= ? AND seq <= ? ORDER BY seq LIMIT ?"
override def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: (PersistentRepr) => Unit): Future[Unit] = Future {
DB.withConnection {
conn =>
if (EventActor.isEventStateMessage(persistenceId)) {
val eventId = EventActor.extractId(persistenceId)
val pstmt = conn.prepareStatement(selectEventSql)
pstmt.setObject(1, eventId)
pstmt.setLong(2, fromSequenceNr)
pstmt.setLong(3, toSequenceNr)
pstmt.setLong(4, max)
replayResultSet(persistenceId, pstmt.executeQuery(), eventManagerEvent)(replayCallback)
} else {
val pstmt = conn.prepareStatement(selectSystemEventSql)
pstmt.setString(1, persistenceId)
pstmt.setLong(2, fromSequenceNr)
pstmt.setLong(3, toSequenceNr)
pstmt.setLong(4, max)
replayResultSet(persistenceId, pstmt.executeQuery(), systemEvent)(replayCallback)
}
}
}
private def replayResultSet(persistenceId: String, resultSet: ResultSet, getPayloadFrom: ResultSet => Try[AnyRef])(replayCallback: (PersistentRepr) => Unit) = {
while (resultSet.next()) {
getPayloadFrom(resultSet) match {
case Success(pl) =>
val seqNr = resultSet.getLong(2)
val isDeleted = resultSet.getBoolean(5)
replayCallback(PersistentRepr(
payload = pl,
sequenceNr = seqNr,
deleted = isDeleted
))
case Failure(err) =>
Logger.error(s"Failed to deserialize payload from $persistenceId Skipping ...", err)
}
}
}
/**
* This is where you would put code that extracts Event Manager events from database result set.
*/
private val eventManagerEvent: ResultSet => Try[Evt] = resultSet => {
val eventType = resultSet.getString(3)
eventType match {
case EventType.TitleUpdated =>
val json = Json.parse(resultSet.getString(5))
Success(
TitleUpdated(
eventId = resultSet.getObject(1).asInstanceOf[UUID],
orgId = resultSet.getLong(4),
userId = resultSet.getLong(6),
title = (json \ "title").as[String],
createdAt = new DateTime(resultSet.getObject(7))
)
)
case EventType.EventDraftCreated =>
Success(
EventDraftCreated(
eventId = resultSet.getObject(1).asInstanceOf[UUID],
orgId = resultSet.getLong(4),
userId = resultSet.getLong(6),
createdAt = new DateTime(resultSet.getObject(7))
)
)
case unknown => Failure(
new IllegalArgumentException(s"Unknown event type $unknown")
)
}
}
/**
* This is de-serialization for Akka sharding messages.
*/
private val systemEvent: ResultSet => Try[AnyRef] = resultSet => {
val bytes = resultSet.getBytes(3)
Try(defaultSerializer.fromBinary(bytes))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment