Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

View pavlov99's full-sized avatar
:shipit:
Coding mode

Kirill Pavlov pavlov99

:shipit:
Coding mode
View GitHub Profile
@pavlov99
pavlov99 / group-additional-count.scala
Created February 22, 2017 02:53
Disjoint (additional) objects count with Apache Spark.
// This method uses Window function to eliminate double counting of objects, which belong to multiple groups.
// `groups` is a DataSet with two columns: id and group. The first column identifies the object, the second is a group name.
// One of the usage of this method is customer segmentation.
val disjointGroups = groups
.withColumn("_rank", dense_rank().over(org.apache.spark.sql.expressions.Window.partitionBy("id").orderBy("group")))
.filter($"_rank" === 1).drop("_rank")
// Show disjoint groups with additional count.
disjointGroups
.groupBy("group")
// NOTE: add minimum and maximum values to thresholds
val thresholds: Array[Double] = Array(Double.MinValue, 0.0) ++ (((0.0 until 50.0 by 10).toArray ++ Array(Double.MaxValue)).map(_.toDouble))
// Convert DataFrame to RDD and calculate histogram values
val _tmpHist = df
.select($"column" cast "double")
.rdd.map(r => r.getDouble(0))
.histogram(thresholds)
// Result DataFrame contains `from`, `to` range and the `value`.
@pavlov99
pavlov99 / haversine.scala
Created December 19, 2016 07:52
Spherical distance calcualtion based on latitude and longitude with Apache Spark
// Based on following links:
// http://andrew.hedges.name/experiments/haversine/
// http://www.movable-type.co.uk/scripts/latlong.html
df
.withColumn("a", pow(sin(toRadians($"destination_latitude" - $"origin_latitude") / 2), 2) + cos(toRadians($"origin_latitude")) * cos(toRadians($"destination_latitude")) * pow(sin(toRadians($"destination_longitude" - $"origin_longitude") / 2), 2))
.withColumn("distance", atan2(sqrt($"a"), sqrt(-$"a" + 1)) * 2 * 6371)
>>>
+--------------+-------------------+-------------+----------------+---------------+----------------+--------------------+---------------------+--------------------+------------------+
|origin_airport|destination_airport| origin_city|destination_city|origin_latitude|origin_longitude|destination_latitude|destination_longitude| a| distance|
@pavlov99
pavlov99 / dump-xgboost-trees.py
Created November 2, 2016 03:20
Dump xgboost trees visualisation to the file system in pdf format.
model = xgb.Booster(model_file='your.model')
model.feature_names = xgtrain.feature_names # Note: xgtrain is your train file with features.
model.feature_types = xgtrain.feature_types
# Number of trees in the model
num_trees = len(model.get_dump())
# dump all of the trees to tree folder
for tree_index in range(num_trees):
dot = xgb.to_graphviz(model, num_trees=tree_index)
@pavlov99
pavlov99 / group-overlap-detection.scala
Last active January 4, 2017 05:55
Check groups overlap
// Data example:
// id group
// 1 A
// 2 A
// 2 B
// In this case object `2` belongs to both groups "A" and "B"
val overlappedGroups = groups.select($"id", $"group" as "_group")
groups
.join(overlappedGroups, (groups("id") === overlappedGroups("id")) && ($"group" < $"_group")) // NOTE: group A < group B, so duplicates (A,B) (B,A) would be removed.
df.select("columnName").collect().map(_.getString(0)).sorted
hive -e 'set hive.cli.print.header=true; select * from table_name' | tr "\t" "," > ~/table_name.csv
@pavlov99
pavlov99 / apache-spark-boolean-operations.scala
Last active October 3, 2016 02:55
Apache Spark: boolean operations with null handling
sc.parallelize(Array[(Int, Option[Boolean])](
(0, Some(true)), (1, Some(false)), (3, null)
)).toDF("id", "column")
.withColumn("notColumn", !$"column")
.withColumn("andNull", $"column" && null)
.withColumn("orNull", $"column" || null)
.withColumn("andFalse", $"column" && false)
.withColumn("orFalse", $"column" || false)
.withColumn("andTrue", $"column" && true)
.withColumn("orTrue", $"column" || true)
scheduleRich.groupBy("division").count().orderBy($"count".desc).show()
+------------+-----+
| division|count|
+------------+-----+
| Pacific| 36|
| Central| 20|
| Atlantic| 16|
|Metropolitan| 16|
+------------+-----+
import org.apache.spark.sql.Window
val competitorWindow = Window
.partitionBy("date", "competitor")
.orderBy(levenshtein($"competitor", $"short_name"))
val scheduleRich = schedule
.join(
teams, levenshtein($"competitor", $"short_name") < 5, "left_outer"
)