Skip to content

Instantly share code, notes, and snippets.

@l15k4
Created November 1, 2017 00:56
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save l15k4/cfe3109fb9b65c3cafe0433efc8e9de2 to your computer and use it in GitHub Desktop.
Save l15k4/cfe3109fb9b65c3cafe0433efc8e9de2 to your computer and use it in GitHub Desktop.
import akka.Done
import akka.stream.ActorMaterializer
import akka.stream.alpakka.s3.MemoryBufferType
import akka.stream.alpakka.s3.scaladsl.S3Client
import akka.stream.scaladsl.{Flow, JsonFraming}
import com.typesafe.scalalogging.{LazyLogging, StrictLogging}
import org.backuity.clist.util.Read
import org.backuity.clist.{CliMain, arg, opt}
import org.joda.time.Interval
import org.joda.time.chrono.ISOChronology
import scala.concurrent.ExecutionContext.Implicits
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.{Failure, Success}
object MigrateS3Storage extends CliMain[Unit]("init") with S3CommandBuilder with LazyLogging {
private[this] lazy val s3Id = sys.env("HADOOP_AWS_ACCESS_KEY_ID")
private[this] lazy val s3Key = sys.env("HADOOP_AWS_SECRET_ACCESS_KEY")
require(s3Id.nonEmpty)
require(s3Key.nonEmpty)
private[this] implicit val intervalReader: Read[Interval] = Read.reads("interval") { interval =>
new Interval(interval, ISOChronology.getInstanceUTC)
}
var sourceDataSource = arg[String](name = "source-data-source")
var sourceBucket = arg[String](name = "source-bucket")
var sourceBucketPath = arg[String](name = "source-bucket-path")
var interval = arg[Interval](name = "interval")
var targetDataSource = arg[String](name = "target-data-source")
var targetBucket = arg[String](name = "target-bucket")
var targetBucketPath = arg[String](name = "target-bucket-path")
var parallelism = opt[Int](default = 16)
override def run = {
require(!sourceBucketPath.startsWith("/") && sourceBucketPath.endsWith("/"), "S3 path cannot start with '/' and must end with it !!!")
require(!targetBucketPath.startsWith("/") && targetBucketPath.endsWith("/"), "S3 path cannot start with '/' and must end with it !!!")
require(!sourceBucketPath.trim.split("/").contains(sourceDataSource), "Path cannot contain DataSource string as one of its parts !!!")
require(!targetBucketPath.trim.split("/").contains(targetDataSource), "Path cannot contain DataSource string as one of its parts !!!")
require(sourceBucketPath != targetBucketPath, "Are you trying to migrate deep storage to itself ??? Go have some sleep !")
Common.closableActorSystem("migrate-s3-storage") { implicit system =>
implicit val materializer = SupervisedMaterializer()
closableS3Driver { implicit s3 =>
implicit val s3Client = s3.alpakka(MemoryBufferType)
S3StorageService(sourceDataSource, targetDataSource, interval, sourceBucket, sourceBucketPath, targetBucket, targetBucketPath, parallelism)
.migrate
.andThen {
case Success(count) =>
logger.info(s"$count segments successfully migrated ...")
case Failure(ex) =>
logger.error("Migration failed due to", ex)
}(Implicits.global)
}
}
}
}
case class Descriptor(key: String, segment: Segment) {
def indexKey: String = segment.loadSpec.key
}
case class S3StorageService(
sourceDataSource: String,
targetDataSource: String,
interval: Interval,
sourceBucket: String,
sourceBucketPath: String,
targetBucket: String,
targetBucketPath: String,
parallelism: Int) extends StrictLogging {
private[this] def modifyS3Key(key: String): String = {
key.trim.split('/').dropWhile(_ != sourceDataSource) match {
case arr if arr.nonEmpty =>
(targetBucketPath.split('/') ++ arr.updated(0, targetDataSource)).mkString("/")
case _ =>
sys.error(s"Key is not compatible with current script implementation :\n$key")
}
}
private[this] def modifyIdentifier(identifier: String) = identifier.replace(sourceDataSource, targetDataSource)
protected[script] def modifyDescriptor(descriptor: Descriptor): Option[Descriptor] = descriptor match {
case Descriptor(key, segment) if modifyS3Key(key) != key && modifyS3Key(segment.loadSpec.key) != segment.loadSpec.key && modifyIdentifier(segment.identifier) != segment.identifier =>
logger.info(s"Processing $key ...")
Some(
Descriptor(
modifyS3Key(key),
segment.copy(
dataSource = targetDataSource,
loadSpec = segment.loadSpec.copy(bucket = targetBucket, key = modifyS3Key(segment.loadSpec.key)),
identifier = modifyIdentifier(segment.identifier)
)
)
)
case brokenDescriptor =>
logger.error(s"Descriptor cannot be modified, skipping !!!\n$brokenDescriptor")
None
}
protected[script] def writeSegment(segment: Segment): String = ObjMapper.miniWriter.writeValueAsString(segment)
protected[script] def readSegment(segment: String): Segment = ObjMapper.readValue[Segment](segment)
private[this] def segmentReaderFlow(descriptorKey: String)(implicit s3Client: S3Client) =
s3Client.download(sourceBucket, descriptorKey)
.via(JsonFraming.objectScanner(32768))
.map(_.utf8String)
.map(readSegment)
.map(Descriptor(descriptorKey, _))
private[this] def copyFlow(implicit s3: S3Driver, alpakka: S3Client, m: ActorMaterializer) = {
Flow[Descriptor]
.map(descriptor => descriptor -> modifyDescriptor(descriptor))
.collect { case (oldDescriptor, Some(newDescriptor)) => oldDescriptor -> newDescriptor }
.mapAsyncUnordered(parallelism) { case (oldDescriptor, newDescriptor) =>
recover(3,1.minute) {
alpakka.download(sourceBucket, oldDescriptor.indexKey).runWith(alpakka.multipartUpload(targetBucket, newDescriptor.indexKey))
.map ( _ => s3.putObject(targetBucket, newDescriptor.key, writeSegment(newDescriptor.segment)))(Implicits.global)
.map ( _ => Done.getInstance() )(Implicits.global)
}
}
}
def migrate(implicit s3: S3Driver, alpakka: S3Client, m: ActorMaterializer, intervalReader: Read[Interval]): Future[Int] =
s3.objSummarySource(sourceBucket, s"$sourceBucketPath$sourceDataSource/").map(_.getKey).filter(_.endsWith("descriptor.json"))
.flatMapMerge(parallelism, { segmentKey => segmentReaderFlow(segmentKey) })
.filter { descriptor => interval.contains(intervalReader.reads(descriptor.segment.interval)) }
.via(copyFlow)
.runFold(0) { case (acc,_) => acc+1 }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment