Skip to content

Instantly share code, notes, and snippets.

@airawat airawat/CompactRawLogs
Last active Aug 18, 2017

What would you like to do?
spark-submit --class com.khanolkar.bda.util.CompactRawLogs \
MyJar-1.0.jar \
"/user/akhanolk/data/raw/streaming/to-be-compacted/" \
"/user/akhanolk/data/raw/compacted/" \
"2" "128" "oozie-124"
package com.khanolkar.bda.util
* @author Anagha Khanolkar
* TODO: add robust exception handling & logging (integrate into OMS)
import org.apache.spark.sql.SparkSession
import org.apache.spark._
import org.apache.hadoop.fs.{ FileSystem, Path }
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql._
object CompactRawLogs {
//Main function
def main(args: Array[String]): Unit = {
//Command-line arguments check
if (args.length != 5) {
println("Please provide 5 parameters: <inputDirectory> <outputDirectory> <minFilesToCompact> <minCompactedFileSizeInMB> <oozieWorkflowID>")
//Capture command-line arguments
val inputDirectory = args(0) //Logs to be compacted
val outputDirectory = args(1) //Compacted logs
val minFilesToCompact = args(2).toInt // Minimum number of files to execute compaction
val minCompactedFileSizeInMB = args(3).toInt //E.g. 128 will compact in 128 MB file sizes
val oozieWorkflowID = args(4) //So we can correlate batch with workflow when we log status
//Spark Session
val sparkSession: SparkSession = SparkSession.builder().master("yarn").getOrCreate()
//Recursive glob support
import sparkSession.implicits._
sparkSession.sparkContext.hadoopConfiguration.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true")
//Get the FileStatus - we will use this to delete raw log directories, post compaction success
val fs = FileSystem.get(new Configuration())
val inputDirFileStatusArr = fs.listStatus(new Path(inputDirectory))
//Proceed with compaction only if we have enough files to compact
if(inputDirFileStatusArr.length > minFilesToCompact) //Ensure we have directories to compact
var errorString = ""
//Attempt compaction
//Determine how many files to coalesce to
val inputDirSize = fs.getContentSummary(new Path(inputDirectory)).getLength
var outputFileCount= Math.floor(inputDirSize/(minCompactedFileSizeInMB * 1024 * 1024)).toInt
if (outputFileCount==0)
outputFileCount = 1
//Read and persist after coalescing
val datasetRDD = sparkSession.sparkContext.textFile(inputDirectory)
datasetRDD.coalesce(outputFileCount).saveAsTextFile(outputDirectory + "/" + oozieWorkflowID)
//No errors, so delete raw logs in input directory by iterating over FileStatus
inputDirFileStatusArr.foreach(x => {
val dirToDelete: Path = x.getPath
fs.delete(dirToDelete, true)
} catch {
case e: Exception =>
errorString = e.getStackTrace.toString()
} finally{
//TODO: Log to integrate into OMS - workflowID | errorString
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.