Skip to content

Instantly share code, notes, and snippets.

@conikeec
Forked from wangzaixiang/EsperUtil.scala
Created February 12, 2012 08:06
Show Gist options
  • Save conikeec/1807184 to your computer and use it in GitHub Desktop.
Save conikeec/1807184 to your computer and use it in GitHub Desktop.
Using Esper with Scala
package demo1
import com.espertech.esper.client.EventBean
import com.espertech.esper.client.EPAdministrator
import com.espertech.esper.client.UpdateListener
import com.espertech.esper.client.EPListenable
import com.espertech.esper.client.EPServiceProvider
object EsperUtil {
def printEvents(newEvents: Array[EventBean], oldEvents: Array[EventBean]) {
printf("\nAt: %tT ", System.currentTimeMillis())
if (newEvents != null) {
println("New Events:" + newEvents.size)
newEvents.foreach { ev =>
println("\t" + ev.getUnderlying())
}
}
if (oldEvents != null) {
println("Old Events:" + oldEvents.size)
oldEvents.foreach { ev =>
println("\t" + ev)
}
}
}
def sleep(millis: Int) {
Thread.sleep(millis)
}
implicit def EPListenable2EPListenableX(listenable: EPListenable) =
new EPListenableX(listenable)
implicit def EPServiceProvider2EPAdministrator(x: EPServiceProvider) = x.getEPAdministrator
implicit def EPServiceProvider2EPRuntime(x: EPServiceProvider) = x.getEPRuntime()
class EPListenableX(admin: EPListenable) {
def addListener(p: (Array[EventBean], Array[EventBean]) => Unit) {
admin.addListener(new UpdateListener {
def update(newEvents:Array[EventBean], oldEvents:Array[EventBean]) {
p(newEvents, oldEvents)
}
})
}
}
}
package demo1
import com.espertech.esper.client.Configuration
import com.espertech.esper.client.EPServiceProviderManager
import com.espertech.esper.client.UpdateListener
import EsperUtil._
import com.espertech.esper.client.EventBean
object KeyedSegmentContextDemo {
def main(args: Array[String]) {
val config = new Configuration
config.addEventType("Withdrawal", classOf[Withdrawal].getName)
val epService = EPServiceProviderManager.getProvider(this.getClass.getName, config)
epService.initialize
val createContext = "create context WithdrawalByAccount partition by account from Withdrawal"
epService.createEPL(createContext)
val monitor = epService.createEPL("""
context WithdrawalByAccount
select irstream *, sum(amount), context.name, context.key1 from Withdrawal.win:length(5)
""")
monitor.addListener { (newEvents, oldEvents) =>
printEvents(newEvents, oldEvents)
}
for(i <- 1 to 10){
val event = Withdrawal("wangzx", i*10)
val event2 = Withdrawal("rainbow", i*10+5)
print("." + event + "." + event2)
epService.sendEvent(event)
epService.sendEvent(event2)
sleep(1000)
}
}
}
package demo1
import scala.reflect.BeanProperty
case class Withdrawal(@BeanProperty val account: String,
@BeanProperty val amount: Int) {
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment