Skip to content

Instantly share code, notes, and snippets.

@githoov
Created May 4, 2016 22:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save githoov/ef13362369905df60a3ca2971b008035 to your computer and use it in GitHub Desktop.
Save githoov/ef13362369905df60a3ca2971b008035 to your computer and use it in GitHub Desktop.
Spark Streaming job for parsing logs
/*
This is a Spark Streaming job
that takes a raw stream of logs
from Flume, parses the log lines
capturing them in an RDD, then
adds a schema and ultimately writes
to HDFS.
Written by Scott Hoover, 2016.
Send questions to scott@looker.com
or githoov on GitHub.
*/
// fire up spark context
val sc = new SparkConf().setAppName("Log Data Webinar")
// fire up sql context for dataframe operations
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// fire up streaming context
val ssc = new StreamingContext(sc, Seconds(5))
// fire up flume stream
val lines = FlumeUtils.createStream(ssc, "localhost", 9988)
// for batch testing
// val lines = sc.textFile("hdfs://ip-10-55-0-147.ec2.internal:8020/logdata/looker.log.20160420.gz")
// regular expressing to match timestamp, message information, and the message body
val lineMatch = "([0-9\\-]+\\s+[0-9\\:\\.]+\\s+[\\+0-9]+)\\s+(?:\\[)(\\S+)(?:\\])(.*)".r
// case class for parsed log-line structure
case class LogLine (
created_at: String,
message_type: Option[String],
thread_id: Option[String],
message_source: Option[String],
message: String
)
// function to extract message type from message information
def messageType(message_data: String): Option[String] = {
return "[A-Z]+".r.findFirstIn(message_data)
}
// function to extract thread identifier from message information
def threadId(message_data: String): Option[String] = {
return "[a-z0-9]+".r.findFirstIn(message_data)
}
// function to extract message source from message information
def messageSource(message_data: String): Option[String] = {
if (message_data.split("\\|").length >= 3) {
Option(message_data.split("\\|")(2))
} else {
None
}
}
// function to match lineMatch regular expression and output LogLine class
def extractValues(line: String): Option[LogLine] = {
line match {
case lineMatch(created_at, message_data, message, _*)
=> return Option(LogLine(created_at, messageType(message_data), threadId(message_data), messageSource(message_data), message))
case _
=> None
}
}
// parse raw lines
val parsedLines = lines.flatMap(extractValues)
// convert RDD to dataframe (with schema)
val linesDF = sqlContext.createDataFrame(parsedLines)
// write structured data to parquet
linesDF.write.parquet("hdfs://ip-10-55-0-147.ec2.internal:8020/home/hadoop/logdata/parsed_lines.parquet")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment