Skip to content

Instantly share code, notes, and snippets.

@dtolpin

dtolpin/pww.py

Last active Apr 3, 2016
Embed
What would you like to do?
Sample implementations of Progressive Window Widening in Apache Spark
def pww(batches, detect):
t = 1
for _ in range(ceil(log2(max_time))):
# Generate sliding windows with half-window step
windows = (batches
.window(2*t, t)
.reduce(lambda a, b: a + b))
# Apply data mining/pattern recognition algorithm
detect(windows)
# Widen windows
t *= 2
batches = (batches
# Double batch duration
.window_size(t, t)
.reduce(lambda a, b: combine(a, b, max_length)))
def combine(a, b, max_length):
ab = a + b
if len(ab) - max_lenbgth > max_length:
ab[max_length:len(ab) - max_length] = []
return ab
def pww[A](batches: org.spark.streaming.DStream[A],
detect: org.apache.spark.streaming.DStream[A]) => Unit) = {
(1 to config.depth).foldLeft((batches, 1)) {
case ((batch, batch_duration), _) => {
val windows = batches
.window(org.apache.spark.streaming.Seconds(2*batch_duration),
org.apache.spark.streaming.Seconds(batch_duration))
.reduceByKey(_ ++ _)
// Apply data mining/pattern recognition algorithm
detect(windows)
widen(batches, batch_duration, config.max_length)
}
}
}
def combine[A](a: Vector[A], b: Vector[A], max_length: Int) = {
val ab = a ++ b
if(ab.length > 2*max_length )
ab.patch(max_length, Seq(), ab.length - 2*max_length);
else
ab
}
def widen(_batches: org.apache.spark.streaming.dstream.
DStream[(String, Vector[Syscall])],
_batch_duration: Int,
max_length: Int) = {
val batch_duration = _batch_duration*2
val batches = _batches
.window(org.apache.spark.streaming.
Seconds(batch_duration),
org.apache.spark.streaming.
Seconds(batch_duration))
.reduceByKey(combine(_, _, max_length))
(batches, batch_duration)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.