Skip to content

Instantly share code, notes, and snippets.

@fancellu
Last active December 27, 2015 14:19
Show Gist options
  • Save fancellu/48edf00bc23334b2a5d7 to your computer and use it in GitHub Desktop.
Save fancellu/48edf00bc23334b2a5d7 to your computer and use it in GitHub Desktop.
package com.barclays.buyit.mgs
import java.util.Date
import com.espertech.esper.client._
import scala.beans.BeanProperty
object EsperTest extends App {
case class Tick(@BeanProperty symbol:String,@BeanProperty price:Double,@BeanProperty timestamp:Long=System.currentTimeMillis)
def generateRandomTick(cepRT:EPRuntime)={
Thread.sleep(50+scala.util.Random.nextInt(50))
val price = scala.util.Random.nextInt(10).toDouble
val symbol = "AAPL"
val tick= Tick(symbol,price)
println("Sending tick:" + tick)
cepRT.sendEvent(tick)
}
val cepConfig = new Configuration()
cepConfig.addEventType("StockTick",classOf[Tick].getName())
val cep = EPServiceProviderManager.getProvider("myCEPEngine",cepConfig)
val cepRT = cep.getEPRuntime()
val cepAdm = cep.getEPAdministrator()
val statement = cepAdm.createEPL("""select * from
StockTick(symbol='AAPL').win:length(2)
having avg(price) > 6.0""")
object CEPListener extends UpdateListener {
def update(newData:Array[EventBean], oldData:Array[EventBean] ) {
newData.foreach{row=>
println(s" Event received: ${row.getUnderlying()}")
}
}
}
statement.addListener(CEPListener)
val statement2 = cepAdm.createEPL("""insert into StockTickFilterer select * from
StockTick(price>6.0)""")
val statement3=cepAdm.createEPL("select * from StockTickFilterer")
object CEPListener2 extends UpdateListener {
def update(newData:Array[EventBean], oldData:Array[EventBean] ) {
newData.foreach{row=>
println(s" Filtered Event received: ${row.getUnderlying()}")
}
}
}
statement3.addListener(CEPListener2)
val statement4=cepAdm.createEPL("select sum(price) as sum_price from StockTick.win:length_batch(2)")
statement4.addListener(new UpdateListener {
def update(newData:Array[EventBean], oldData:Array[EventBean] ) {
newData.foreach{row=>
println(s" Aggregate Event received: ${row.getUnderlying()}")
}
}
})
val statement5=cepAdm.createEPL("select * from StockTick output every 3 events")
statement5.addListener(new UpdateListener {
def update(newData:Array[EventBean], oldData:Array[EventBean] ) {
newData.foreach{row=>
println(s" output controlled Event received: ${row.getUnderlying()}")
}
}
})
cepRT.sendEvent(Tick("IBM",100))
cepRT.sendEvent(Tick("XXX",200))
for(i<-0 to 9)
generateRandomTick(cepRT)
Thread.sleep(2000)
}
Sample output:
Sending tick:Tick(AAPL,7.0,1383842038818)
Event received: Tick(AAPL,7.0,1383842038818)
Filtered Event received: Tick(AAPL,7.0,1383842038818)
Sending tick:Tick(AAPL,9.0,1383842038828)
Event received: Tick(AAPL,9.0,1383842038828)
Filtered Event received: Tick(AAPL,9.0,1383842038828)
Sending tick:Tick(AAPL,8.0,1383842038829)
Event received: Tick(AAPL,8.0,1383842038829)
Filtered Event received: Tick(AAPL,8.0,1383842038829)
Sending tick:Tick(AAPL,4.0,1383842038829)
Sending tick:Tick(AAPL,6.0,1383842038829)
Sending tick:Tick(AAPL,2.0,1383842038829)
Sending tick:Tick(AAPL,3.0,1383842038829)
Sending tick:Tick(AAPL,7.0,1383842038829)
Filtered Event received: Tick(AAPL,7.0,1383842038829)
Sending tick:Tick(AAPL,0.0,1383842038829)
Sending tick:Tick(AAPL,8.0,1383842038829)
Filtered Event received: Tick(AAPL,8.0,1383842038829)
Sending tick:Tick(AAPL,0.0,1383842038830)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment