Skip to content

Instantly share code, notes, and snippets.

def getUniqueString(
df : DataFrame,
col : String
) : Array[String] = {
df.select(col).na.drop.dropDuplicates.map(_.getString(0)).collect
}
val filterRecentTimeUDF : Long => UserDefinedFunction = {
(l : Long) => {
functions.udf(
// s refers to eventTime, t refers to userEventTime
(s : Timestamp, t: Timestamp) => if (s.getTime - t.getTime < l) true else false
)
}
}
def timePartition (
data: DataFrame,
timeCol : String,
outputCol : String,
startTime : Long = 1404198000000L
) : DataFrame = {
val addTime : Long = 604800000L // Discretize time into weeks.
val udf : UserDefinedFunction = functions.udf(
(t : Timestamp) => ((t.getTime - startTime) / addTime).toInt
)
def timePartition (
data: DataFrame,
timeCol : String,
startTime : Long = 1404198000000L
) : Seq[DataFrame] = {
val addTimeMilli : Long = 604800000L // Discretize time into weeks.
val rightNowMilli : Long = new Date().getTime
(1 to (rightNowMilli / addTimeMilli).toInt).map(k => new Timestamp(startTime + k.toLong * addTimeMilli))
.map(time => data.filter(data(timeCol) < time))
def aggHomogeneity (input : RDD[(Int, Int, Double)]) : RDD[(Int, Int, Double)] = {
input.cache
// Get row sums.
val rowSums: HashMap[Int, Double] = HashMap(
input.map(e => (e._1, e._3))
.reduceByKey(_ + _)
.collect : _*
)
val rowSums: HashMap[Int, Double] = HashMap(
input.map(e => (e._1, e._3))
.reduceByKey(_ + _)
.collect : _*
)
def aggHomogeneity (input : RDD[(Int, Int, Double)]) : RDD[(Int, Int, Double)] = {
input.cache
// Get row sums.
val rowSums: HashMap[Int, Double] = HashMap(
input.groupBy(_._1)
.map(e => (e._1, e._2.foldLeft(0.0)((b, a) => b + a._3)))
.collect : _*
)
def aggHomogeneity (input : RDD[(Int, Int, Double)]) : RDD[(Int, Int, Double)] = {
input.cache
// Get row sums.
val rowSums: HashMap[Int, Double] = HashMap(
input.groupBy(_._1)
.map(e => (e._1, e._2.foldLeft(0.0)((b, a) => b + a._3)))
.collect : _*
)
def splitDataTest (
sqlContext : SQLContext,
data : DataFrame,
rowCol : String,
colCol : String,
tokenizer : String => Array[String],
idf : Boolean = true,
numFolds : Int
) : Seq[(AssociatedData, RDD[TestObservation])] = {
val path : String = {
import scala.sys.process._
val x : String = "/pio/hadoop/hadoop-2.5.2/bin/hdfs dfs -ls /data/dab/prod-view" #|
"tail -1" !!
x.replace("\n", "").split(" ").takeRight(1)(0)
}