Skip to content

Instantly share code, notes, and snippets.

@choffmeister
Created March 25, 2020 08:33
Show Gist options
  • Save choffmeister/d5740b71e72ae4c2dc0484e456f0ddaa to your computer and use it in GitHub Desktop.
Save choffmeister/d5740b71e72ae4c2dc0484e456f0ddaa to your computer and use it in GitHub Desktop.
package io.airfocus.workspaceservice.managers
import akka.Done
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Sink, Source}
import com.typesafe.scalalogging.LazyLogging
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
abstract class MigrationManager extends LazyLogging {
import MigrationManager.{Migration, StreamingMigration}
implicit val actorSystem: ActorSystem
implicit val executionContext: ExecutionContext = actorSystem.dispatcher
def run(): Future[Done] = Future.successful(Done)
protected def runMigration(migration: => Migration): Future[Done] = {
val (id, future) = migration
logMigration(id, future)
}
protected def runStreamingMigration(migration: => StreamingMigration): Future[Done] = {
val (id, source) = migration
val future =
source
.map(Option.apply)
.merge(Source.tick(3.second, 3.second, None), eagerComplete = true)
.statefulMapConcat(() => {
var count = 0L
elem =>
elem match {
case Some(additionalCount) =>
count = count + additionalCount
count :: Nil
case None =>
logger.info(s"Migration $id processed $count elements so far")
Nil
}
})
.runWith(Sink.lastOption)
.map(_.getOrElse(0L))
logMigration(id, future)
}
private def logMigration(id: String, future: Future[Long]): Future[Done] = {
logger.info(s"Migration $id started")
future
.transform {
case Success(count) =>
logger.info(s"Migration $id succeeded after processing $count elements")
Success(Done)
case Failure(err) =>
logger.error(s"Migration $id failed", err)
Failure(err)
}
}
}
object MigrationManager {
type Migration = (String, Future[Long])
type StreamingMigration = (String, Source[Long, Done])
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment