Skip to content

Instantly share code, notes, and snippets.

import scala.util.Random
object SillyShuffle {
def randomJitterShuffle[T](sortedList: List[T],
ordFun: T => Int,
copyFun: (T, Int) => T,
jitterMin: Int,
jitterMax: Int): List[T] = {
sortedList.reverse.foldLeft(List.empty[T]) {
@mgilham
mgilham / leftAttachOne.scala
Last active October 15, 2015 21:26
Spark DataFrame leftAttachOne
/** Utility method that implements a constrained special-purpose join more efficiently than Spark SQL's current left outer join.
Assumes that the left DataFrame contains exactly one row per key, and the right DataFrame contains zero or one row per key.
*/
def leftAttachOne(left: DataFrame, right: DataFrame, leftKeyPos: Int = 0, rightKeyPos: Int = 0): DataFrame = {
val nullableRightSchema = new StructType(right.schema.fields.map(_.copy(nullable=true)))
val nullRightRow = Row.fromSeq(Array.fill(right.schema.fields.size)(null))
val leftKeyTuples = left.rdd.map(row => (row(leftKeyPos), row))
val rightKeyTuples = right.rdd.map(row => (row(rightKeyPos), row))
val cogrouped = leftKeyTuples.cogroup(rightKeyTuples).flatMap { case (_, (leftRows, rightRows)) =>
leftRows.headOption.map(leftRow => Row.merge(leftRow, rightRows.headOption.getOrElse(nullRightRow)))
@mgilham
mgilham / SumVector.scala
Created October 14, 2015 19:58
Spark SQL sumVector UDAF
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.mllib.linalg.SparseVector
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.VectorUDT
// MLG: this is highly unoptimized, but likely good enough for now
@mgilham
mgilham / gist:93967d36e978074b2235
Last active August 29, 2015 14:04
Kafka 1->2 replicas json
#!/usr/bin/python
import sys
import json
def getAssignmentJson(line):
splut = line.split()
topic, partition, replicas = splut[1], splut[3], splut[7]
replicas = replicas.split(',')
if len(replicas) == 1: