Sample implementations of Progressive Window Widening in Apache Spark
def pww(batches, detect): | |
t = 1 | |
for _ in range(ceil(log2(max_time))): | |
# Generate sliding windows with half-window step | |
windows = (batches | |
.window(2*t, t) | |
.reduce(lambda a, b: a + b)) | |
# Apply data mining/pattern recognition algorithm | |
detect(windows) | |
# Widen windows | |
t *= 2 | |
batches = (batches | |
# Double batch duration | |
.window_size(t, t) | |
.reduce(lambda a, b: combine(a, b, max_length))) | |
def combine(a, b, max_length): | |
ab = a + b | |
if len(ab) - max_lenbgth > max_length: | |
ab[max_length:len(ab) - max_length] = [] | |
return ab |
def pww[A](batches: org.spark.streaming.DStream[A], | |
detect: org.apache.spark.streaming.DStream[A]) => Unit) = { | |
(1 to config.depth).foldLeft((batches, 1)) { | |
case ((batch, batch_duration), _) => { | |
val windows = batches | |
.window(org.apache.spark.streaming.Seconds(2*batch_duration), | |
org.apache.spark.streaming.Seconds(batch_duration)) | |
.reduceByKey(_ ++ _) | |
// Apply data mining/pattern recognition algorithm | |
detect(windows) | |
widen(batches, batch_duration, config.max_length) | |
} | |
} | |
} | |
def combine[A](a: Vector[A], b: Vector[A], max_length: Int) = { | |
val ab = a ++ b | |
if(ab.length > 2*max_length ) | |
ab.patch(max_length, Seq(), ab.length - 2*max_length); | |
else | |
ab | |
} | |
def widen(_batches: org.apache.spark.streaming.dstream. | |
DStream[(String, Vector[Syscall])], | |
_batch_duration: Int, | |
max_length: Int) = { | |
val batch_duration = _batch_duration*2 | |
val batches = _batches | |
.window(org.apache.spark.streaming. | |
Seconds(batch_duration), | |
org.apache.spark.streaming. | |
Seconds(batch_duration)) | |
.reduceByKey(combine(_, _, max_length)) | |
(batches, batch_duration) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment