Skip to content

Instantly share code, notes, and snippets.

@sreejithpillai
Last active February 19, 2017 15:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sreejithpillai/8ea546be0c77dddc39513131a9d4f0ce to your computer and use it in GitHub Desktop.
Save sreejithpillai/8ea546be0c77dddc39513131a9d4f0ce to your computer and use it in GitHub Desktop.
Log analyzer spark-scala code
package com.demo.loganalyzer
/**
*
* Created by Sreejith Pillai.
*
* */
import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
case class LogSchema(address: String,
datetime: String,
action: Option[String]
)
class TransformMapper extends Logging{
def transform(events: RDD[LogSchema]) = {
val e = events.map(x => (x.datetime, 1)).reduceByKey (_ + _)
e.saveAsTextFile("/user/sreejith/loganalyzer/logoutput/")
}
}
object MapRawData extends Serializable with Logging{
def mapRawLine(line: String): Option[LogSchema] = {
try {
val fields = line.split(",", -1).map(_.trim)
Some(
LogSchema(
address = fields(0),
datetime = fields(1).substring(13, 15),
action = if (fields(2).length > 2) Some(fields(2)) else None
)
)
}
catch {
case e: Exception =>
log.warn(s"Unable to parse line: $line")
None
}
}
}
package com.demo.loganalyzer
import org.apache.spark._
import org.joda.time.DateTime
/**
*
* Created by Sreejith Pillai.
*
**/
object RunMainJob extends TransformMapper with Logging{
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("spark-loganalyzer")
val sc = new SparkContext(conf)
val startTimeJob = new DateTime(sc.startTime)
val applicationId = sc.applicationId
log.info("Application launched with id : " + applicationId)
val rawData = sc.textFile("/user/sreejith/testlog")
val numberOfRawLines = rawData.count()
log.info("Number of lines to parse : " + numberOfRawLines)
val mapRawData = MapRawData
val parseData = rawData.flatMap(x => mapRawData.mapRawLine(x))
log.info("Number of lines after parsing: ")
transform(parseData)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment