Created
May 31, 2021 06:06
-
-
Save ptrdom/b81fb971a7b82d1e135b6f64cb5e2944 to your computer and use it in GitHub Desktop.
Akka Persistence Cassandra client for interaction with persistence layer with bypass of Lagom's entities
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.nio.ByteBuffer | |
import java.time.Instant | |
import java.time.temporal.ChronoUnit | |
import java.util.UUID | |
import akka.Done | |
import akka.actor.ActorSystem | |
import akka.serialization.SerializationExtension | |
import akka.serialization.SerializerWithStringManifest | |
import com.datastax.driver.core.BatchStatement | |
import com.datastax.driver.core.Cluster | |
import com.datastax.driver.core.utils.UUIDs | |
import com.lightbend.lagom.scaladsl.persistence.AggregateEvent | |
import com.lightbend.lagom.scaladsl.persistence.AggregateEventShards | |
import com.lightbend.lagom.scaladsl.persistence.PersistentEntity | |
import scala.collection.JavaConverters._ | |
import scala.concurrent.ExecutionContextExecutor | |
class AkkaPersistenceCassandraClient(actorSystem: ActorSystem) { | |
implicit val executionContext: ExecutionContextExecutor = actorSystem.dispatcher | |
val serialization = SerializationExtension(actorSystem) | |
val config = actorSystem.settings.config | |
val contactPoint = config.getString("cassandra-journal.contact-points.0") | |
val port = config.getInt("cassandra-journal.port") | |
val keyspace = config.getString("cassandra-journal.keyspace") | |
implicit val session = Cluster.builder | |
.addContactPoint(contactPoint) | |
.withPort(port) | |
.build | |
.connect() | |
actorSystem.registerOnTermination(() => session.close()) | |
private val writeMessagesStatement = session.prepare( | |
s""" | |
INSERT INTO ${keyspace}.messages ( | |
persistence_id, | |
partition_nr, | |
sequence_nr, | |
timestamp, | |
timebucket, | |
writer_uuid, | |
ser_id, | |
ser_manifest, | |
event_manifest, | |
event, | |
used, | |
tags | |
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, true, ?) | |
""" | |
) | |
private val writeTagsStatement = session.prepare( | |
s""" | |
INSERT INTO ${keyspace}.tag_views ( | |
tag_name, | |
timebucket, | |
timestamp, | |
tag_pid_sequence_nr, | |
event, | |
event_manifest, | |
persistence_id, | |
sequence_nr, | |
ser_id, | |
ser_manifest, | |
writer_uuid | |
) VALUES (?,?,?,?,?,?,?,?,?,?,?) | |
""" | |
) | |
private val writeTagProgressStatement = session.prepare( | |
s""" | |
INSERT INTO ${keyspace}.tag_write_progress ( | |
persistence_id, | |
tag, | |
offset, | |
sequence_nr, | |
tag_pid_sequence_nr | |
) VALUES (?,?,?,?,?) | |
""" | |
) | |
private val writeTagScanningStatement = session.prepare( | |
s""" | |
INSERT INTO ${keyspace}.tag_scanning ( | |
persistence_id, | |
sequence_nr | |
) VALUES (?,?) | |
""" | |
) | |
def writeEvent[Event <: AggregateEvent[Event]]( | |
entityType: PersistentEntity, | |
entityId: String, | |
sequenceNr: Long, | |
tag: AggregateEventShards[Event], | |
event: Event | |
): Done = { | |
val persistenceId = entityType.entityTypeName + "|" + entityId | |
val partitionNr = ((sequenceNr - 1L) / 500000).asInstanceOf[java.lang.Long] | |
val sequenceNrJava = sequenceNr.asInstanceOf[java.lang.Long] | |
val timestamp = UUIDs.timeBased() | |
val timeBucket = Instant | |
.ofEpochMilli(UUIDs.unixTimestamp(timestamp)) | |
.truncatedTo(ChronoUnit.HOURS) | |
.toEpochMilli | |
val timeBucketString = timeBucket.toString | |
val writerUUID = UUID.randomUUID().toString | |
val (serializerId, serializerManifest, serializedEvent) = { | |
val serializer = serialization.findSerializerFor(event).asInstanceOf[SerializerWithStringManifest] | |
( | |
serializer.identifier.asInstanceOf[java.lang.Integer], | |
serializer.manifest(event), | |
ByteBuffer.wrap(serializer.toBinary(event)) | |
) | |
} | |
val eventManifest = "" | |
val tagName = tag.forEntityId(entityId).tag | |
val tags = Set(tagName).asJava | |
val boundWriteMessagesStatement = writeMessagesStatement.bind | |
boundWriteMessagesStatement.setString("persistence_id", persistenceId) | |
boundWriteMessagesStatement.setLong("partition_nr", partitionNr) | |
boundWriteMessagesStatement.setLong("sequence_nr", sequenceNrJava) | |
boundWriteMessagesStatement.setUUID("timestamp", timestamp) | |
boundWriteMessagesStatement.setString("timebucket", timeBucketString) | |
boundWriteMessagesStatement.setString("writer_uuid", writerUUID) | |
boundWriteMessagesStatement.setInt("ser_id", serializerId) | |
boundWriteMessagesStatement.setString("ser_manifest", serializerManifest) | |
boundWriteMessagesStatement.setString("event_manifest", eventManifest) | |
boundWriteMessagesStatement.setBytes("event", serializedEvent) | |
boundWriteMessagesStatement.setSet("tags", tags, classOf[String]) | |
val boundWriteTagsStatement = writeTagsStatement.bind | |
boundWriteTagsStatement.setString("tag_name", tagName) | |
boundWriteTagsStatement.setLong("timebucket", timeBucket) | |
boundWriteTagsStatement.setUUID("timestamp", timestamp) | |
boundWriteTagsStatement.setString("persistence_id", persistenceId) | |
boundWriteTagsStatement.setLong("tag_pid_sequence_nr", sequenceNrJava) | |
boundWriteTagsStatement.setBytes("event", serializedEvent) | |
boundWriteTagsStatement.setString("ser_manifest", serializerManifest) | |
boundWriteTagsStatement.setLong("sequence_nr", sequenceNrJava) | |
boundWriteTagsStatement.setInt("ser_id", serializerId) | |
boundWriteTagsStatement.setString("event_manifest", eventManifest) | |
boundWriteTagsStatement.setString("writer_uuid", writerUUID) | |
val boundWriteTagProgressStatement = writeTagProgressStatement.bind | |
boundWriteTagProgressStatement.setString("persistence_id", persistenceId) | |
boundWriteTagProgressStatement.setString("tag", tagName) | |
boundWriteTagProgressStatement.setUUID("offset", timestamp) | |
boundWriteTagProgressStatement.setLong("sequence_nr", sequenceNrJava) | |
boundWriteTagProgressStatement.setLong("tag_pid_sequence_nr", sequenceNrJava) | |
val boundWriteTagScanningStatement = writeTagScanningStatement.bind | |
boundWriteTagScanningStatement.setString("persistence_id", persistenceId) | |
boundWriteTagScanningStatement.setLong("sequence_nr", sequenceNrJava) | |
val batch = new BatchStatement() | |
batch.add(boundWriteMessagesStatement) | |
batch.add(boundWriteTagsStatement) | |
batch.add(boundWriteTagProgressStatement) | |
batch.add(boundWriteTagScanningStatement) | |
session.execute(batch) | |
Done | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment