Skip to content

Instantly share code, notes, and snippets.

@qi-qi
Created October 15, 2019 20:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save qi-qi/2ee691f9d9db82f202a67bb9eb92ec56 to your computer and use it in GitHub Desktop.
Save qi-qi/2ee691f9d9db82f202a67bb9eb92ec56 to your computer and use it in GitHub Desktop.
val ranges = collect_set(struct($"from", $"to")).as("from_to")
df.groupBy($"id")
.agg(ranges)
.withColumn("bytes_sum_unique", Util.findUniqueBytesUDF($"from_to"))
// Use the BitSet from java.util.BitSet() due to performance
val findUniqueBytesUDF: UserDefinedFunction = udf { ranges: Seq[Row] =>
ranges
.map(x => (x.getAs[Int]("legit_from"), x.getAs[Int]("legit_to")))
.aggregate(new java.util.BitSet())((bitset, range) => {
bitset.set(range._1, range._2 + 1)
bitset
}, (bitset1, bitset2) => {
bitset1.or(bitset2)
bitset1
})
.cardinality
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment