Skip to content

Instantly share code, notes, and snippets.

@vasily-kirichenko
Created December 4, 2018 12:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vasily-kirichenko/c39926d7ea11b79a61055c7b54071036 to your computer and use it in GitHub Desktop.
Save vasily-kirichenko/c39926d7ea11b79a61055c7b54071036 to your computer and use it in GitHub Desktop.
import akka.NotUsed
import akka.actor.Scheduler
import akka.actor.typed.{ActorRef, _}
import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.cluster.Cluster
import akka.cluster.ddata.{LWWMap, LWWMapKey, ReplicatedData}
import akka.cluster.ddata.typed.scaladsl.{Replicator, _}
import akka.stream._
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import org.joda.time.DateTime
import org.slf4j.LoggerFactory
import scala.collection.JavaConverters._
import scala.concurrent._
import scala.concurrent.duration._
import scala.util.Random
case class Hashes
(
md5: Seq[Byte],
sha1: Seq[Byte],
sha256: Seq[Byte]
)
case class FileMeta
(
hashes: Hashes,
size: Long,
categories: Seq[Int],
created: DateTime
) extends ReplicatedData {
override type T = FileMeta
override def merge(that: FileMeta) = that
}
trait FileMetaStorage {
def add(bundleUrn: String, meta: FileMeta): Future[Unit]
def get(bundleUrn: String): Future[Option[FileMeta]]
}
class DDataFileMetaStorage()(implicit replicator: ActorRef[Replicator.Command], ctx: ExecutionContext, scheduler: Scheduler, node: Cluster)
extends FileMetaStorage {
private val Key = LWWMapKey[String, FileMeta]("file-meta")
private implicit val timeout = Timeout(5.seconds)
override def add(bundleUrn: String, meta: FileMeta): Future[Unit] = {
(replicator ? ((ref: ActorRef[Replicator.UpdateResponse[_]]) =>
Replicator.Update(Key, LWWMap.empty[String, FileMeta], Replicator.WriteLocal, ref, Some(meta)) {
m => m + (bundleUrn -> meta)
}))
.map(_ => ())
}
override def get(bundleUrn: String): Future[Option[FileMeta]] = {
(replicator ? ((ref: ActorRef[Replicator.GetResponse[_]]) => Replicator.Get(Key, Replicator.ReadLocal, ref)))
.map {
case resp@Replicator.GetSuccess(Key, _) =>
resp.get(Key).get(bundleUrn)
case _ =>
//sys.log.error(failure.toString)
None
}
}
}
object Main {
def main(args: Array[String]): Unit = {
val log = LoggerFactory.getLogger(getClass)
val config = ConfigFactory.load()
log.info(s"Akka cluster listening ${config.getString("akka.remote.netty.tcp.hostname")}:${config.getString("akka.remote.netty.tcp.port")}, " +
s"seeds: ${config.getList("akka.cluster.seed-nodes").unwrapped().asScala.toList}")
implicit val system = ActorSystem(Behaviors.empty[NotUsed], "ddata-test", config)
implicit val untypedSystem = system.toUntyped
implicit val ctx = system.executionContext
implicit val scheduler = system.scheduler
implicit val cluster = Cluster(untypedSystem)
implicit val replicator = DistributedData(system).replicator
val fileMetaStorage = new DDataFileMetaStorage
if (config.getBoolean("generate-data")) {
for (n <- 1 to 1000000) {
val md5 = Array.fill(16) {
0: Byte
}
Random.nextBytes(md5)
val sha1 = Array.fill(20) {
0: Byte
}
Random.nextBytes(sha1)
val sha256 = Array.fill(32) {
0: Byte
}
Random.nextBytes(sha256)
val cats = Seq.fill(3)(Random.nextInt())
val meta = FileMeta(Hashes(md5, sha1, sha256), Random.nextLong(), cats, DateTime.now())
fileMetaStorage.add(Random.nextString(50), meta)
if (n % 1000 == 0) println(s"$n added")
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment