Skip to content

Instantly share code, notes, and snippets.

@aalexandrov
Last active April 24, 2017 05:34
Show Gist options
  • Save aalexandrov/c0ac7f6f1fc8c16897fd2c558845e668 to your computer and use it in GitHub Desktop.
Save aalexandrov/c0ac7f6f1fc8c16897fd2c558845e668 to your computer and use it in GitHub Desktop.
Stream API Playground
/*
* Copyright © 2014 TU Berlin (emma@dima.tu-berlin.de)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.emmalanguage
package api
object stream {
type Time = Long
private trait Stream[E] {
def next(): E
def watermark(): Time
}
private class ProcessingTimeStream[E](
parent: Stream[E]
) extends Stream[Timestamped[E]] {
val imprintTime: E => Timestamped[E] = (e: E) => new Timestamped[E] {
val value: E = e
val timestamp: Time = System.nanoTime()
}
var maxTime: Time = Long.MinValue
def next(): Timestamped[E] = {
val e = parent.next()
val x = imprintTime(e)
maxTime = Math.max(maxTime, x.timestamp) // ensures that watermark() is monotonic
x
}
def watermark(): Time = maxTime
}
private class EventTimeStream[E](
parent: Stream[E],
diff: Time,
timeOf: E => Time
) extends Stream[Timestamped[E]] {
val imprintTime: E => Timestamped[E] = (e: E) => new Timestamped[E] {
val value: E = e
val timestamp: Time = timeOf(e)
}
var maxTime: Time = Long.MinValue
def next(): Timestamped[E] = {
val e = parent.next()
val x = imprintTime(e)
maxTime = Math.max(maxTime, x.timestamp) // ensures that watermark() is monotonic
x
}
def watermark(): Time = maxTime - diff
}
// event types
private trait Timestamped[E] {
val value: E
val timestamp: Time
}
private case class AdClick
(
adID: Long,
userID: Long,
time: Time
)
private case class AdShow
(
adID: Long,
userID: Long,
url: String,
time: Time
)
// mock streams
val adClicksSrc: Stream[AdClick] = ???
val adShowsSrc: Stream[AdShow] = ???
val adClicksET: Stream[Timestamped[AdClick]] = new EventTimeStream[AdClick](adClicksSrc, 1000L, _.time)
val adShowsET: Stream[Timestamped[AdShow]] = new EventTimeStream[AdShow](adShowsSrc, 1000L, _.time)
// correlate syntax (mock)
def from[A](in: Stream[A]): A = ???
def correlate[A](block: A): A = ???
def where(expr: Boolean): Nothing = ???
def emit[A](expr: A): A = ???
// examples
val `10 seconds` = 1000000000L
correlate {
val click = from(adClicksET)
val show = from(adShowsET)
where {
click.timestamp > show.timestamp &&
click.timestamp - show.timestamp < `10 seconds`
}
emit (click, show)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment