Created
June 8, 2016 20:49
-
-
Save l15k4/25588d35a6c786b4ade514739c0195ee to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def runHybrid(s3Keys: Seq[String], parallelism: Int): Stats = { | |
sc.parallelize(s3Keys, parallelism) | |
.mapPartitions { keys => | |
val s3 = IO.getS3(s3Id, s3Key, s3Region) | |
def read = keys.map(key => key -> MiLogFileParser.getBytes(s3.getObject(inputBucket, key).getObjectContent, true)) | |
val(contentByKey, took) = IO.profile( () => read ) | |
readingAcc.add(took) | |
contentByKey | |
}.flatMap { case (key, content) => | |
val lines = new String(content).split('\n') | |
if (lines.length > 4) { | |
val ((result, errors), parsingTook) = IO.profile(() => MiLogFileParser.parse(lines)) | |
parsingAcc.add(parsingTook) | |
if (errors.nonEmpty) { | |
println(s"ERROR: File $key has invalid expected parsing errors !!!") | |
errors.foreach(println) | |
} | |
def process = result.map(MiRowProcessor.process).map(record => MiRowProcessor.getFilePath(MiRowProcessor.getFileName(record, "foo", "json")) -> ObjMapper.miniWriter.writeValueAsString(record)) | |
val (recordsByFilePath, processingTook) = IO.profile(() => process) | |
processingAcc.add(processingTook) | |
recordsByFilePath | |
} else Seq.empty | |
}.saveAsHadoopFile(s"s3a://$s3Id:$s3Key@$s3OutputPath", classOf[String], classOf[String], classOf[RDDMultipleTextOutputFormat], classOf[GzipCodec]) | |
Stats(readingAcc, parsingAcc, processingAcc) | |
} | |
/** | |
s3-spark-base-etl_1 | 16/06/08 20:23:43 INFO TaskSetManager: Finished task 23.0 in stage 0.0 (TID 23) in 79333 ms on docker_worker_1.docker_default (24/24) | |
s3-spark-base-etl_1 | 16/06/08 20:23:43 INFO DAGScheduler: ResultStage 0 (saveAsHadoopFile at BaseETL.scala:74) finished in 356.936 s | |
s3-spark-base-etl_1 | 16/06/08 20:23:43 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool | |
s3-spark-base-etl_1 | 16/06/08 20:23:43 INFO DAGScheduler: Job 0 finished: saveAsHadoopFile at BaseETL.scala:74, took 357.074586 s | |
s3-spark-base-etl_1 | 16/06/08 20:24:08 INFO BlockManagerInfo: Removed broadcast_0_piece0 on 172.18.0.6:39826 in memory (size: 26.2 KB, free: 5.0 GB) | |
s3-spark-base-etl_1 | 16/06/08 20:24:08 INFO BlockManagerInfo: Removed broadcast_0_piece0 on 172.18.0.3:38578 in memory (size: 26.2 KB, free: 10.4 GB) | |
s3-spark-base-etl_1 | | |
s3-spark-base-etl_1 | reading: | |
s3-spark-base-etl_1 | count : 24 | |
s3-spark-base-etl_1 | sum : 0.716 ms | |
s3-spark-base-etl_1 | op takes : 0.03 ms | |
s3-spark-base-etl_1 | rate ops/s : 33 504.719 | |
s3-spark-base-etl_1 | | |
s3-spark-base-etl_1 | parsing: | |
s3-spark-base-etl_1 | count : 2616 | |
s3-spark-base-etl_1 | sum : 305 332.964 ms | |
s3-spark-base-etl_1 | op takes : 116.717 ms | |
s3-spark-base-etl_1 | rate ops/s : 8.568 | |
s3-spark-base-etl_1 | | |
s3-spark-base-etl_1 | processing: | |
s3-spark-base-etl_1 | count : 2616 | |
s3-spark-base-etl_1 | sum : 701 798.916 ms | |
s3-spark-base-etl_1 | op takes : 268.272 ms | |
s3-spark-base-etl_1 | rate ops/s : 3.728 | |
s3-spark-base-etl_1 | | |
s3-spark-base-etl_1 | 16/06/08 20:36:01 INFO SparkUI: Stopped Spark web UI at http://54.171.153.241:4040 | |
s3-spark-base-etl_1 | 16/06/08 20:36:01 INFO SparkDeploySchedulerBackend: Shutting down all executors | |
*/ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment