-
-
Save vkhitev/921bee13257e9761d75d67daf13e781a to your computer and use it in GitHub Desktop.
Dremio & Spark & Kotlin
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@file:Suppress("UsePropertyAccessSyntax") | |
import org.apache.log4j.PropertyConfigurator | |
import org.apache.spark.sql.Dataset | |
import org.apache.spark.sql.Row | |
import org.apache.spark.sql.SparkSession | |
import org.apache.spark.sql.functions.* | |
import scala.collection.JavaConversions | |
// host, time, verb, resource, status, bytes | |
fun castDf(df: Dataset<Row>) = | |
df.withColumn("_tmp", split(col("request"), " ")).select( | |
col("host"), | |
unix_timestamp( | |
col("request_time"), "dd/MMM/yyyy:HH:mm:ss" | |
).cast("timestamp").alias("time"), | |
col("_tmp").getItem(0).alias("verb"), | |
col("_tmp").getItem(1).alias("resource"), | |
col("status").cast("short"), | |
col("bytes") | |
).drop("_tmp")!! | |
fun main(args: Array<String>) { | |
PropertyConfigurator.configure("src/main/resources/log4j.properties") | |
System.setProperty("hadoop.home.dir", "c:/winutil/") | |
Class.forName("com.dremio.jdbc.Driver") | |
val spark = SparkSession.builder() | |
.appName("Dremio-Spark example") | |
.master("local[*]") | |
.getOrCreate() | |
spark.use { | |
val dfRaw = spark.read() | |
.format("jdbc") | |
.option("url", "jdbc:dremio:direct=127.0.0.1:31010") | |
.option("dbtable", """"@admin"."logs"""") | |
.option("user", "admin") | |
.option("password", "admin123qwe") | |
.load() | |
val df = castDf(dfRaw) | |
df.printSchema() | |
contentSizeStatistics(df) | |
statusAnalysis(df) | |
hostAnalysis(df) | |
topErrorPaths(df) | |
uniqueHosts(df) | |
uniqueDailyHosts(df) | |
averageNumberOfDailyRequestsPerHost(df) | |
} | |
} | |
fun contentSizeStatistics(df: Dataset<Row>) { | |
println("Statistics based on the content size") | |
df.describe("bytes").show() | |
} | |
fun statusAnalysis(df: Dataset<Row>) { | |
println("Count of requests grouped by status code") | |
df.groupBy("status") | |
.count() | |
.sort("status") | |
.show() | |
} | |
fun hostAnalysis(df: Dataset<Row>) { | |
println("Hosts that accesed the server more than 700 times") | |
df.groupBy("host") | |
.count() | |
.filter(col("count").gt(700)) | |
.sort(col("count").desc()) | |
.show() | |
} | |
fun topErrorPaths(df: Dataset<Row>) { | |
println("Top 10 error paths") | |
df.filter(col("status").equalTo(404)) | |
.groupBy("resource") | |
.count() | |
.sort(col("count").desc()) | |
.show(10) | |
} | |
fun uniqueHosts(df: Dataset<Row>) { | |
val count = df.select("host") | |
.distinct() | |
.count() | |
println("Number of unique hosts: $count\n") | |
} | |
fun uniqueDailyHosts(df: Dataset<Row>) { | |
println("Unique daily hosts") | |
val days = df.select( | |
col("host"), | |
month(col("time")).alias("month"), | |
dayofmonth(col("time")).alias("day") | |
).distinct() | |
days.groupBy("month", "day") | |
.count() | |
.sort(desc("count")) | |
.show() | |
} | |
fun averageNumberOfDailyRequestsPerHost(df: Dataset<Row>) { | |
println("Average number of daily requests per host") | |
val dailyHosts = df.select( | |
col("host"), | |
month(col("time")).alias("month"), | |
dayofmonth(col("time")).alias("day") | |
).distinct().groupBy("month", "day").count() | |
val totalReqPerDay = df.select( | |
month(col("time")).alias("month"), | |
dayofmonth(col("time")).alias("day") | |
).groupBy("month", "day").count() | |
val grouping = JavaConversions.asScalaBuffer(listOf("month", "day")).toSeq() | |
totalReqPerDay.join(dailyHosts, grouping) | |
.select( | |
col("month"), | |
col("day"), | |
totalReqPerDay.col("count").divide(dailyHosts.col("count")) | |
.alias("avg") | |
).sort(col("avg").desc()).show() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment