Last active
April 3, 2016 22:41
-
-
Save dtolpin/84158adce3e4218af06453771cae15f2 to your computer and use it in GitHub Desktop.
Sample implementations of Progressive Window Widening in Apache Spark
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def pww(batches, detect): | |
t = 1 | |
for _ in range(ceil(log2(max_time))): | |
# Generate sliding windows with half-window step | |
windows = (batches | |
.window(2*t, t) | |
.reduce(lambda a, b: a + b)) | |
# Apply data mining/pattern recognition algorithm | |
detect(windows) | |
# Widen windows | |
t *= 2 | |
batches = (batches | |
# Double batch duration | |
.window_size(t, t) | |
.reduce(lambda a, b: combine(a, b, max_length))) | |
def combine(a, b, max_length): | |
ab = a + b | |
if len(ab) - max_lenbgth > max_length: | |
ab[max_length:len(ab) - max_length] = [] | |
return ab |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def pww[A](batches: org.spark.streaming.DStream[A], | |
detect: org.apache.spark.streaming.DStream[A]) => Unit) = { | |
(1 to config.depth).foldLeft((batches, 1)) { | |
case ((batch, batch_duration), _) => { | |
val windows = batches | |
.window(org.apache.spark.streaming.Seconds(2*batch_duration), | |
org.apache.spark.streaming.Seconds(batch_duration)) | |
.reduceByKey(_ ++ _) | |
// Apply data mining/pattern recognition algorithm | |
detect(windows) | |
widen(batches, batch_duration, config.max_length) | |
} | |
} | |
} | |
def combine[A](a: Vector[A], b: Vector[A], max_length: Int) = { | |
val ab = a ++ b | |
if(ab.length > 2*max_length ) | |
ab.patch(max_length, Seq(), ab.length - 2*max_length); | |
else | |
ab | |
} | |
def widen(_batches: org.apache.spark.streaming.dstream. | |
DStream[(String, Vector[Syscall])], | |
_batch_duration: Int, | |
max_length: Int) = { | |
val batch_duration = _batch_duration*2 | |
val batches = _batches | |
.window(org.apache.spark.streaming. | |
Seconds(batch_duration), | |
org.apache.spark.streaming. | |
Seconds(batch_duration)) | |
.reduceByKey(combine(_, _, max_length)) | |
(batches, batch_duration) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment