This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def mergeFiles(spark: SparkSession, grouped: ListBuffer[ListBuffer[String]], targetDirectory: String): Unit = { | |
val startedAt = System.currentTimeMillis() | |
val forkJoinPool = new ForkJoinPool(grouped.size) | |
val parllelBatches = grouped.par | |
parllelBatches.tasksupport = new ForkJoinTaskSupport(forkJoinPool) | |
parllelBatches foreach (batch => { | |
logger.debug(s"Merging ${batch.size} files into one") | |
try { | |
spark.read.parquet(batch.toList: _*).coalesce(1).write.mode("append").parquet(targetDirectory.stripSuffix("/") + "/") | |
} catch { |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def makeMergeBatches(fileSizesMap: scala.collection.immutable.Map[String, Long], maxTargetFileSize: Long): ListBuffer[ListBuffer[String]] = { | |
val sortedFileSizes = fileSizesMap.toSeq.sortBy(_._2) | |
val groupedFiles = ListBuffer[ListBuffer[String]]() | |
groupedFiles += ListBuffer[String]() | |
for (aFile <- smallerFiles) { | |
val lastBatch = groupedFiles.last | |
if ((sizeOfThisBatch(lastBatch) + aFile._2) < maxTargetFileSize) { | |
lastBatch += aFile._1 + "|" + aFile._2.toString | |
} else { | |
val newBatch = ListBuffer[String]() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val (inputBucket, prefix) = getBucketNameAndPrefix(args(1)) | |
val targetDirectory = args(2) | |
val maxIndividualMergedFileSize = args(3).toLong | |
val inputDirs = listDirectoriesInS3(inputBucket, prefix).map(prefix => "s3://" + inputBucket + "/" + prefix) | |
logger.info(s"Total directories found : ${inputDirs.size}") | |
val startedAt = System.currentTimeMillis() | |
//You may want to tweak the following to set how many input directories to process in parallel | |
val forkJoinPool = new ForkJoinPool(inputDirs.size) | |
val parallelBatches = inputDirs.par |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def getFileSizes(bucketName: String, prefix: String): scala.collection.immutable.Map[String, Long] = { | |
val s3 = AmazonS3ClientBuilder.standard().withRegion(Regions.US_EAST_1).build() | |
val listing = s3.listObjectsV2(bucketName, prefix) | |
val files = listing.getObjectSummaries.asScala.map(_.getKey).filter(!_.split("/").last.startsWith("_")) | |
val filesSizeMap = collection.mutable.Map[String, Long]() | |
files.foreach(obj => { | |
val meta = s3.getObjectMetadata(new GetObjectMetadataRequest(bucketName, obj)) | |
filesSizeMap += (obj -> meta.getContentLength) | |
}) | |
filesSizeMap.toMap |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Media | Transfer Rate per second | Rate in MB per second | |
---|---|---|---|
Mobile 5G | 60Mb | 7.5 | |
WiFi | 125Mb | 15 | |
Fiber Internet | |||
Cable Internet | 100Mbps | ||
Ethernet cable(LAN) | 10Gb | 1250 | |
SD Card | 10MB | 10 | |
HDD | 200MB | 200 | |
SSD | 550MB | 550 | |
RAM | 25GB | 25000 |