Skip to content

Instantly share code, notes, and snippets.

def mergeFiles(spark: SparkSession, grouped: ListBuffer[ListBuffer[String]], targetDirectory: String): Unit = {
val startedAt = System.currentTimeMillis()
val forkJoinPool = new ForkJoinPool(grouped.size)
val parllelBatches = grouped.par
parllelBatches.tasksupport = new ForkJoinTaskSupport(forkJoinPool)
parllelBatches foreach (batch => {
logger.debug(s"Merging ${batch.size} files into one")
try {
spark.read.parquet(batch.toList: _*).coalesce(1).write.mode("append").parquet(targetDirectory.stripSuffix("/") + "/")
} catch {
def makeMergeBatches(fileSizesMap: scala.collection.immutable.Map[String, Long], maxTargetFileSize: Long): ListBuffer[ListBuffer[String]] = {
val sortedFileSizes = fileSizesMap.toSeq.sortBy(_._2)
val groupedFiles = ListBuffer[ListBuffer[String]]()
groupedFiles += ListBuffer[String]()
for (aFile <- smallerFiles) {
val lastBatch = groupedFiles.last
if ((sizeOfThisBatch(lastBatch) + aFile._2) < maxTargetFileSize) {
lastBatch += aFile._1 + "|" + aFile._2.toString
} else {
val newBatch = ListBuffer[String]()
val (inputBucket, prefix) = getBucketNameAndPrefix(args(1))
val targetDirectory = args(2)
val maxIndividualMergedFileSize = args(3).toLong
val inputDirs = listDirectoriesInS3(inputBucket, prefix).map(prefix => "s3://" + inputBucket + "/" + prefix)
logger.info(s"Total directories found : ${inputDirs.size}")
val startedAt = System.currentTimeMillis()
//You may want to tweak the following to set how many input directories to process in parallel
val forkJoinPool = new ForkJoinPool(inputDirs.size)
val parallelBatches = inputDirs.par
def getFileSizes(bucketName: String, prefix: String): scala.collection.immutable.Map[String, Long] = {
val s3 = AmazonS3ClientBuilder.standard().withRegion(Regions.US_EAST_1).build()
val listing = s3.listObjectsV2(bucketName, prefix)
val files = listing.getObjectSummaries.asScala.map(_.getKey).filter(!_.split("/").last.startsWith("_"))
val filesSizeMap = collection.mutable.Map[String, Long]()
files.foreach(obj => {
val meta = s3.getObjectMetadata(new GetObjectMetadataRequest(bucketName, obj))
filesSizeMap += (obj -> meta.getContentLength)
})
filesSizeMap.toMap
@justhackit
justhackit / data-rates.csv
Last active July 24, 2021 21:22
Data Rates table
Media Transfer Rate per second Rate in MB per second
Mobile 5G 60Mb 7.5
WiFi 125Mb 15
Fiber Internet
Cable Internet 100Mbps
Ethernet cable(LAN) 10Gb 1250
SD Card 10MB 10
HDD 200MB 200
SSD 550MB 550
RAM 25GB 25000