Skip to content

Instantly share code, notes, and snippets.

@vkhitev
Last active October 18, 2017 18:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vkhitev/921bee13257e9761d75d67daf13e781a to your computer and use it in GitHub Desktop.
Save vkhitev/921bee13257e9761d75d67daf13e781a to your computer and use it in GitHub Desktop.
Dremio & Spark & Kotlin
@file:Suppress("UsePropertyAccessSyntax")
import org.apache.log4j.PropertyConfigurator
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.*
import scala.collection.JavaConversions
// host, time, verb, resource, status, bytes
fun castDf(df: Dataset<Row>) =
df.withColumn("_tmp", split(col("request"), " ")).select(
col("host"),
unix_timestamp(
col("request_time"), "dd/MMM/yyyy:HH:mm:ss"
).cast("timestamp").alias("time"),
col("_tmp").getItem(0).alias("verb"),
col("_tmp").getItem(1).alias("resource"),
col("status").cast("short"),
col("bytes")
).drop("_tmp")!!
fun main(args: Array<String>) {
PropertyConfigurator.configure("src/main/resources/log4j.properties")
System.setProperty("hadoop.home.dir", "c:/winutil/")
Class.forName("com.dremio.jdbc.Driver")
val spark = SparkSession.builder()
.appName("Dremio-Spark example")
.master("local[*]")
.getOrCreate()
spark.use {
val dfRaw = spark.read()
.format("jdbc")
.option("url", "jdbc:dremio:direct=127.0.0.1:31010")
.option("dbtable", """"@admin"."logs"""")
.option("user", "admin")
.option("password", "admin123qwe")
.load()
val df = castDf(dfRaw)
df.printSchema()
contentSizeStatistics(df)
statusAnalysis(df)
hostAnalysis(df)
topErrorPaths(df)
uniqueHosts(df)
uniqueDailyHosts(df)
averageNumberOfDailyRequestsPerHost(df)
}
}
fun contentSizeStatistics(df: Dataset<Row>) {
println("Statistics based on the content size")
df.describe("bytes").show()
}
fun statusAnalysis(df: Dataset<Row>) {
println("Count of requests grouped by status code")
df.groupBy("status")
.count()
.sort("status")
.show()
}
fun hostAnalysis(df: Dataset<Row>) {
println("Hosts that accesed the server more than 700 times")
df.groupBy("host")
.count()
.filter(col("count").gt(700))
.sort(col("count").desc())
.show()
}
fun topErrorPaths(df: Dataset<Row>) {
println("Top 10 error paths")
df.filter(col("status").equalTo(404))
.groupBy("resource")
.count()
.sort(col("count").desc())
.show(10)
}
fun uniqueHosts(df: Dataset<Row>) {
val count = df.select("host")
.distinct()
.count()
println("Number of unique hosts: $count\n")
}
fun uniqueDailyHosts(df: Dataset<Row>) {
println("Unique daily hosts")
val days = df.select(
col("host"),
month(col("time")).alias("month"),
dayofmonth(col("time")).alias("day")
).distinct()
days.groupBy("month", "day")
.count()
.sort(desc("count"))
.show()
}
fun averageNumberOfDailyRequestsPerHost(df: Dataset<Row>) {
println("Average number of daily requests per host")
val dailyHosts = df.select(
col("host"),
month(col("time")).alias("month"),
dayofmonth(col("time")).alias("day")
).distinct().groupBy("month", "day").count()
val totalReqPerDay = df.select(
month(col("time")).alias("month"),
dayofmonth(col("time")).alias("day")
).groupBy("month", "day").count()
val grouping = JavaConversions.asScalaBuffer(listOf("month", "day")).toSeq()
totalReqPerDay.join(dailyHosts, grouping)
.select(
col("month"),
col("day"),
totalReqPerDay.col("count").divide(dailyHosts.col("count"))
.alias("avg")
).sort(col("avg").desc()).show()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment