This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.Done | |
import akka.stream.ActorMaterializer | |
import akka.stream.alpakka.s3.MemoryBufferType | |
import akka.stream.alpakka.s3.scaladsl.S3Client | |
import akka.stream.scaladsl.{Flow, JsonFraming} | |
import com.typesafe.scalalogging.{LazyLogging, StrictLogging} | |
import org.backuity.clist.util.Read | |
import org.backuity.clist.{CliMain, arg, opt} | |
import org.joda.time.Interval | |
import org.joda.time.chrono.ISOChronology | |
import scala.concurrent.ExecutionContext.Implicits | |
import scala.concurrent.Future | |
import scala.concurrent.duration._ | |
import scala.util.{Failure, Success} | |
object MigrateS3Storage extends CliMain[Unit]("init") with S3CommandBuilder with LazyLogging { | |
private[this] lazy val s3Id = sys.env("HADOOP_AWS_ACCESS_KEY_ID") | |
private[this] lazy val s3Key = sys.env("HADOOP_AWS_SECRET_ACCESS_KEY") | |
require(s3Id.nonEmpty) | |
require(s3Key.nonEmpty) | |
private[this] implicit val intervalReader: Read[Interval] = Read.reads("interval") { interval => | |
new Interval(interval, ISOChronology.getInstanceUTC) | |
} | |
var sourceDataSource = arg[String](name = "source-data-source") | |
var sourceBucket = arg[String](name = "source-bucket") | |
var sourceBucketPath = arg[String](name = "source-bucket-path") | |
var interval = arg[Interval](name = "interval") | |
var targetDataSource = arg[String](name = "target-data-source") | |
var targetBucket = arg[String](name = "target-bucket") | |
var targetBucketPath = arg[String](name = "target-bucket-path") | |
var parallelism = opt[Int](default = 16) | |
override def run = { | |
require(!sourceBucketPath.startsWith("/") && sourceBucketPath.endsWith("/"), "S3 path cannot start with '/' and must end with it !!!") | |
require(!targetBucketPath.startsWith("/") && targetBucketPath.endsWith("/"), "S3 path cannot start with '/' and must end with it !!!") | |
require(!sourceBucketPath.trim.split("/").contains(sourceDataSource), "Path cannot contain DataSource string as one of its parts !!!") | |
require(!targetBucketPath.trim.split("/").contains(targetDataSource), "Path cannot contain DataSource string as one of its parts !!!") | |
require(sourceBucketPath != targetBucketPath, "Are you trying to migrate deep storage to itself ??? Go have some sleep !") | |
Common.closableActorSystem("migrate-s3-storage") { implicit system => | |
implicit val materializer = SupervisedMaterializer() | |
closableS3Driver { implicit s3 => | |
implicit val s3Client = s3.alpakka(MemoryBufferType) | |
S3StorageService(sourceDataSource, targetDataSource, interval, sourceBucket, sourceBucketPath, targetBucket, targetBucketPath, parallelism) | |
.migrate | |
.andThen { | |
case Success(count) => | |
logger.info(s"$count segments successfully migrated ...") | |
case Failure(ex) => | |
logger.error("Migration failed due to", ex) | |
}(Implicits.global) | |
} | |
} | |
} | |
} | |
case class Descriptor(key: String, segment: Segment) { | |
def indexKey: String = segment.loadSpec.key | |
} | |
case class S3StorageService( | |
sourceDataSource: String, | |
targetDataSource: String, | |
interval: Interval, | |
sourceBucket: String, | |
sourceBucketPath: String, | |
targetBucket: String, | |
targetBucketPath: String, | |
parallelism: Int) extends StrictLogging { | |
private[this] def modifyS3Key(key: String): String = { | |
key.trim.split('/').dropWhile(_ != sourceDataSource) match { | |
case arr if arr.nonEmpty => | |
(targetBucketPath.split('/') ++ arr.updated(0, targetDataSource)).mkString("/") | |
case _ => | |
sys.error(s"Key is not compatible with current script implementation :\n$key") | |
} | |
} | |
private[this] def modifyIdentifier(identifier: String) = identifier.replace(sourceDataSource, targetDataSource) | |
protected[script] def modifyDescriptor(descriptor: Descriptor): Option[Descriptor] = descriptor match { | |
case Descriptor(key, segment) if modifyS3Key(key) != key && modifyS3Key(segment.loadSpec.key) != segment.loadSpec.key && modifyIdentifier(segment.identifier) != segment.identifier => | |
logger.info(s"Processing $key ...") | |
Some( | |
Descriptor( | |
modifyS3Key(key), | |
segment.copy( | |
dataSource = targetDataSource, | |
loadSpec = segment.loadSpec.copy(bucket = targetBucket, key = modifyS3Key(segment.loadSpec.key)), | |
identifier = modifyIdentifier(segment.identifier) | |
) | |
) | |
) | |
case brokenDescriptor => | |
logger.error(s"Descriptor cannot be modified, skipping !!!\n$brokenDescriptor") | |
None | |
} | |
protected[script] def writeSegment(segment: Segment): String = ObjMapper.miniWriter.writeValueAsString(segment) | |
protected[script] def readSegment(segment: String): Segment = ObjMapper.readValue[Segment](segment) | |
private[this] def segmentReaderFlow(descriptorKey: String)(implicit s3Client: S3Client) = | |
s3Client.download(sourceBucket, descriptorKey) | |
.via(JsonFraming.objectScanner(32768)) | |
.map(_.utf8String) | |
.map(readSegment) | |
.map(Descriptor(descriptorKey, _)) | |
private[this] def copyFlow(implicit s3: S3Driver, alpakka: S3Client, m: ActorMaterializer) = { | |
Flow[Descriptor] | |
.map(descriptor => descriptor -> modifyDescriptor(descriptor)) | |
.collect { case (oldDescriptor, Some(newDescriptor)) => oldDescriptor -> newDescriptor } | |
.mapAsyncUnordered(parallelism) { case (oldDescriptor, newDescriptor) => | |
recover(3,1.minute) { | |
alpakka.download(sourceBucket, oldDescriptor.indexKey).runWith(alpakka.multipartUpload(targetBucket, newDescriptor.indexKey)) | |
.map ( _ => s3.putObject(targetBucket, newDescriptor.key, writeSegment(newDescriptor.segment)))(Implicits.global) | |
.map ( _ => Done.getInstance() )(Implicits.global) | |
} | |
} | |
} | |
def migrate(implicit s3: S3Driver, alpakka: S3Client, m: ActorMaterializer, intervalReader: Read[Interval]): Future[Int] = | |
s3.objSummarySource(sourceBucket, s"$sourceBucketPath$sourceDataSource/").map(_.getKey).filter(_.endsWith("descriptor.json")) | |
.flatMapMerge(parallelism, { segmentKey => segmentReaderFlow(segmentKey) }) | |
.filter { descriptor => interval.contains(intervalReader.reads(descriptor.segment.interval)) } | |
.via(copyFlow) | |
.runFold(0) { case (acc,_) => acc+1 } | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment