Skip to content

Instantly share code, notes, and snippets.

@pandeykumar
Created September 14, 2016 16:51
Show Gist options
  • Save pandeykumar/8cc9c7ffc9806988d45a06916b7723e3 to your computer and use it in GitHub Desktop.
Save pandeykumar/8cc9c7ffc9806988d45a06916b7723e3 to your computer and use it in GitHub Desktop.
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SaveMode
case class LogRecord( date_time: String, log_level: String, http_verb: String, path: String, response_time:Int, status_code:Int, client_ip:String )
object ProcessLog {
val PATTERN = """^(\S+ \S+) (\S+)* \[(\S+)\] (\S+) (\S+) \- (\S+) (\S+) (\S+) (\S+) ((\S+ \S+)) (\d{3}) \[(\S+)\]""".r
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.appName("Spark PayclipLogProcessor")
.config("spark.some.config.option", "some-value")
.getOrCreate()
val MYSQL_USERNAME:String = "user";
val MYSQL_PWD:String = "passwd";
val MYSQL_CONNECTION_URL:String =
"jdbc:mysql://host:3306/platformdata?user=" + MYSQL_USERNAME + "&password=" + MYSQL_PWD;
val prop = new java.util.Properties
prop.setProperty("user",MYSQL_USERNAME)
prop.setProperty("password",MYSQL_PWD)
prop.setProperty("driver", "com.mysql.jdbc.Driver")
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
val arg:Option[String] = if(args.length > 0) Some(args(0)) else None
arg match {
case Some(arg) => {
println("Argument to process : "+arg)
val accessLogLines = spark.sparkContext.textFile(arg)
val linesWithReturned = accessLogLines.filter(line => line.contains("returned"))
val logRecord =linesWithReturned.map(parseLogLine).filter(!_.date_time.equals("Empty"))
val logDF = logRecord.toDF()
logDF.write.mode(SaveMode.Append).jdbc(MYSQL_CONNECTION_URL, "api_external_response_log", prop)
// val statusDF =linesWithReturned.map(line => line.split("returned").filter{ x => !x.contains("actor")}.map{x => x.substring(1,4).toInt}).flatMap(x=> x).toDF("status_code")
// val result = statusDF.groupBy("status_code").count()
// result.write.mode(SaveMode.Append).jdbc(MYSQL_CONNECTION_URL, "http_status_counts", prop)
}
case None => {
println("unable to get argument to process")
}
}
}
def parseLogLine(log: String): LogRecord = {
try {
val res = PATTERN.findFirstMatchIn(log)
if (res.isEmpty) {
println("Rejected Log Line: " + log)
LogRecord("Empty", "-", "-", "", -1, -1, "")
}
else {
val m = res.get
val rec= LogRecord(m.group(1).split(",")(0), m.group(4), m.group(6), m.group(7),
m.group(9).dropRight(2).toInt, m.group(12).toInt, m.group(13))
println(rec)
rec
}
} catch
{
case e: Exception =>
println("Exception on line:" + log + ":" + e.getMessage);
LogRecord("Empty", "-", "-", "", -1, -1, "")
}
}
}
@pandeykumar
Copy link
Author

Sample log lines this code will process -
2016-09-13 00:32:41,861 [play-akka.actor.default-dispatcher-29] INFO application - GET /api/transaction/search?date=1473726761&timezone=America/Los_Angeles took 29ms and returned 200 [201.72.178.16]
2016-09-13 00:32:53,857 1qZKVvP8CeHW5bEdK76j [play-akka.actor.default-dispatcher-42] INFO application - GET /healthcheck.html took 2ms and returned 200 [10.164.21.159]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment