Skip to content

Instantly share code, notes, and snippets.

@l15k4
Created June 8, 2016 20:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save l15k4/25588d35a6c786b4ade514739c0195ee to your computer and use it in GitHub Desktop.
Save l15k4/25588d35a6c786b4ade514739c0195ee to your computer and use it in GitHub Desktop.
def runHybrid(s3Keys: Seq[String], parallelism: Int): Stats = {
sc.parallelize(s3Keys, parallelism)
.mapPartitions { keys =>
val s3 = IO.getS3(s3Id, s3Key, s3Region)
def read = keys.map(key => key -> MiLogFileParser.getBytes(s3.getObject(inputBucket, key).getObjectContent, true))
val(contentByKey, took) = IO.profile( () => read )
readingAcc.add(took)
contentByKey
}.flatMap { case (key, content) =>
val lines = new String(content).split('\n')
if (lines.length > 4) {
val ((result, errors), parsingTook) = IO.profile(() => MiLogFileParser.parse(lines))
parsingAcc.add(parsingTook)
if (errors.nonEmpty) {
println(s"ERROR: File $key has invalid expected parsing errors !!!")
errors.foreach(println)
}
def process = result.map(MiRowProcessor.process).map(record => MiRowProcessor.getFilePath(MiRowProcessor.getFileName(record, "foo", "json")) -> ObjMapper.miniWriter.writeValueAsString(record))
val (recordsByFilePath, processingTook) = IO.profile(() => process)
processingAcc.add(processingTook)
recordsByFilePath
} else Seq.empty
}.saveAsHadoopFile(s"s3a://$s3Id:$s3Key@$s3OutputPath", classOf[String], classOf[String], classOf[RDDMultipleTextOutputFormat], classOf[GzipCodec])
Stats(readingAcc, parsingAcc, processingAcc)
}
/**
s3-spark-base-etl_1 | 16/06/08 20:23:43 INFO TaskSetManager: Finished task 23.0 in stage 0.0 (TID 23) in 79333 ms on docker_worker_1.docker_default (24/24)
s3-spark-base-etl_1 | 16/06/08 20:23:43 INFO DAGScheduler: ResultStage 0 (saveAsHadoopFile at BaseETL.scala:74) finished in 356.936 s
s3-spark-base-etl_1 | 16/06/08 20:23:43 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
s3-spark-base-etl_1 | 16/06/08 20:23:43 INFO DAGScheduler: Job 0 finished: saveAsHadoopFile at BaseETL.scala:74, took 357.074586 s
s3-spark-base-etl_1 | 16/06/08 20:24:08 INFO BlockManagerInfo: Removed broadcast_0_piece0 on 172.18.0.6:39826 in memory (size: 26.2 KB, free: 5.0 GB)
s3-spark-base-etl_1 | 16/06/08 20:24:08 INFO BlockManagerInfo: Removed broadcast_0_piece0 on 172.18.0.3:38578 in memory (size: 26.2 KB, free: 10.4 GB)
s3-spark-base-etl_1 |
s3-spark-base-etl_1 | reading:
s3-spark-base-etl_1 | count : 24
s3-spark-base-etl_1 | sum : 0.716 ms
s3-spark-base-etl_1 | op takes : 0.03 ms
s3-spark-base-etl_1 | rate ops/s : 33 504.719
s3-spark-base-etl_1 |
s3-spark-base-etl_1 | parsing:
s3-spark-base-etl_1 | count : 2616
s3-spark-base-etl_1 | sum : 305 332.964 ms
s3-spark-base-etl_1 | op takes : 116.717 ms
s3-spark-base-etl_1 | rate ops/s : 8.568
s3-spark-base-etl_1 |
s3-spark-base-etl_1 | processing:
s3-spark-base-etl_1 | count : 2616
s3-spark-base-etl_1 | sum : 701 798.916 ms
s3-spark-base-etl_1 | op takes : 268.272 ms
s3-spark-base-etl_1 | rate ops/s : 3.728
s3-spark-base-etl_1 |
s3-spark-base-etl_1 | 16/06/08 20:36:01 INFO SparkUI: Stopped Spark web UI at http://54.171.153.241:4040
s3-spark-base-etl_1 | 16/06/08 20:36:01 INFO SparkDeploySchedulerBackend: Shutting down all executors
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment