Skip to content

Instantly share code, notes, and snippets.

@ptrdom
Created May 31, 2021 06:06
Show Gist options
  • Save ptrdom/b81fb971a7b82d1e135b6f64cb5e2944 to your computer and use it in GitHub Desktop.
Save ptrdom/b81fb971a7b82d1e135b6f64cb5e2944 to your computer and use it in GitHub Desktop.
Akka Persistence Cassandra client for interaction with persistence layer with bypass of Lagom's entities
import java.nio.ByteBuffer
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.UUID
import akka.Done
import akka.actor.ActorSystem
import akka.serialization.SerializationExtension
import akka.serialization.SerializerWithStringManifest
import com.datastax.driver.core.BatchStatement
import com.datastax.driver.core.Cluster
import com.datastax.driver.core.utils.UUIDs
import com.lightbend.lagom.scaladsl.persistence.AggregateEvent
import com.lightbend.lagom.scaladsl.persistence.AggregateEventShards
import com.lightbend.lagom.scaladsl.persistence.PersistentEntity
import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContextExecutor
class AkkaPersistenceCassandraClient(actorSystem: ActorSystem) {
implicit val executionContext: ExecutionContextExecutor = actorSystem.dispatcher
val serialization = SerializationExtension(actorSystem)
val config = actorSystem.settings.config
val contactPoint = config.getString("cassandra-journal.contact-points.0")
val port = config.getInt("cassandra-journal.port")
val keyspace = config.getString("cassandra-journal.keyspace")
implicit val session = Cluster.builder
.addContactPoint(contactPoint)
.withPort(port)
.build
.connect()
actorSystem.registerOnTermination(() => session.close())
private val writeMessagesStatement = session.prepare(
s"""
INSERT INTO ${keyspace}.messages (
persistence_id,
partition_nr,
sequence_nr,
timestamp,
timebucket,
writer_uuid,
ser_id,
ser_manifest,
event_manifest,
event,
used,
tags
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, true, ?)
"""
)
private val writeTagsStatement = session.prepare(
s"""
INSERT INTO ${keyspace}.tag_views (
tag_name,
timebucket,
timestamp,
tag_pid_sequence_nr,
event,
event_manifest,
persistence_id,
sequence_nr,
ser_id,
ser_manifest,
writer_uuid
) VALUES (?,?,?,?,?,?,?,?,?,?,?)
"""
)
private val writeTagProgressStatement = session.prepare(
s"""
INSERT INTO ${keyspace}.tag_write_progress (
persistence_id,
tag,
offset,
sequence_nr,
tag_pid_sequence_nr
) VALUES (?,?,?,?,?)
"""
)
private val writeTagScanningStatement = session.prepare(
s"""
INSERT INTO ${keyspace}.tag_scanning (
persistence_id,
sequence_nr
) VALUES (?,?)
"""
)
def writeEvent[Event <: AggregateEvent[Event]](
entityType: PersistentEntity,
entityId: String,
sequenceNr: Long,
tag: AggregateEventShards[Event],
event: Event
): Done = {
val persistenceId = entityType.entityTypeName + "|" + entityId
val partitionNr = ((sequenceNr - 1L) / 500000).asInstanceOf[java.lang.Long]
val sequenceNrJava = sequenceNr.asInstanceOf[java.lang.Long]
val timestamp = UUIDs.timeBased()
val timeBucket = Instant
.ofEpochMilli(UUIDs.unixTimestamp(timestamp))
.truncatedTo(ChronoUnit.HOURS)
.toEpochMilli
val timeBucketString = timeBucket.toString
val writerUUID = UUID.randomUUID().toString
val (serializerId, serializerManifest, serializedEvent) = {
val serializer = serialization.findSerializerFor(event).asInstanceOf[SerializerWithStringManifest]
(
serializer.identifier.asInstanceOf[java.lang.Integer],
serializer.manifest(event),
ByteBuffer.wrap(serializer.toBinary(event))
)
}
val eventManifest = ""
val tagName = tag.forEntityId(entityId).tag
val tags = Set(tagName).asJava
val boundWriteMessagesStatement = writeMessagesStatement.bind
boundWriteMessagesStatement.setString("persistence_id", persistenceId)
boundWriteMessagesStatement.setLong("partition_nr", partitionNr)
boundWriteMessagesStatement.setLong("sequence_nr", sequenceNrJava)
boundWriteMessagesStatement.setUUID("timestamp", timestamp)
boundWriteMessagesStatement.setString("timebucket", timeBucketString)
boundWriteMessagesStatement.setString("writer_uuid", writerUUID)
boundWriteMessagesStatement.setInt("ser_id", serializerId)
boundWriteMessagesStatement.setString("ser_manifest", serializerManifest)
boundWriteMessagesStatement.setString("event_manifest", eventManifest)
boundWriteMessagesStatement.setBytes("event", serializedEvent)
boundWriteMessagesStatement.setSet("tags", tags, classOf[String])
val boundWriteTagsStatement = writeTagsStatement.bind
boundWriteTagsStatement.setString("tag_name", tagName)
boundWriteTagsStatement.setLong("timebucket", timeBucket)
boundWriteTagsStatement.setUUID("timestamp", timestamp)
boundWriteTagsStatement.setString("persistence_id", persistenceId)
boundWriteTagsStatement.setLong("tag_pid_sequence_nr", sequenceNrJava)
boundWriteTagsStatement.setBytes("event", serializedEvent)
boundWriteTagsStatement.setString("ser_manifest", serializerManifest)
boundWriteTagsStatement.setLong("sequence_nr", sequenceNrJava)
boundWriteTagsStatement.setInt("ser_id", serializerId)
boundWriteTagsStatement.setString("event_manifest", eventManifest)
boundWriteTagsStatement.setString("writer_uuid", writerUUID)
val boundWriteTagProgressStatement = writeTagProgressStatement.bind
boundWriteTagProgressStatement.setString("persistence_id", persistenceId)
boundWriteTagProgressStatement.setString("tag", tagName)
boundWriteTagProgressStatement.setUUID("offset", timestamp)
boundWriteTagProgressStatement.setLong("sequence_nr", sequenceNrJava)
boundWriteTagProgressStatement.setLong("tag_pid_sequence_nr", sequenceNrJava)
val boundWriteTagScanningStatement = writeTagScanningStatement.bind
boundWriteTagScanningStatement.setString("persistence_id", persistenceId)
boundWriteTagScanningStatement.setLong("sequence_nr", sequenceNrJava)
val batch = new BatchStatement()
batch.add(boundWriteMessagesStatement)
batch.add(boundWriteTagsStatement)
batch.add(boundWriteTagProgressStatement)
batch.add(boundWriteTagScanningStatement)
session.execute(batch)
Done
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment