Last active
December 27, 2015 14:19
-
-
Save fancellu/48edf00bc23334b2a5d7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.barclays.buyit.mgs | |
import java.util.Date | |
import com.espertech.esper.client._ | |
import scala.beans.BeanProperty | |
object EsperTest extends App { | |
case class Tick(@BeanProperty symbol:String,@BeanProperty price:Double,@BeanProperty timestamp:Long=System.currentTimeMillis) | |
def generateRandomTick(cepRT:EPRuntime)={ | |
Thread.sleep(50+scala.util.Random.nextInt(50)) | |
val price = scala.util.Random.nextInt(10).toDouble | |
val symbol = "AAPL" | |
val tick= Tick(symbol,price) | |
println("Sending tick:" + tick) | |
cepRT.sendEvent(tick) | |
} | |
val cepConfig = new Configuration() | |
cepConfig.addEventType("StockTick",classOf[Tick].getName()) | |
val cep = EPServiceProviderManager.getProvider("myCEPEngine",cepConfig) | |
val cepRT = cep.getEPRuntime() | |
val cepAdm = cep.getEPAdministrator() | |
val statement = cepAdm.createEPL("""select * from | |
StockTick(symbol='AAPL').win:length(2) | |
having avg(price) > 6.0""") | |
object CEPListener extends UpdateListener { | |
def update(newData:Array[EventBean], oldData:Array[EventBean] ) { | |
newData.foreach{row=> | |
println(s" Event received: ${row.getUnderlying()}") | |
} | |
} | |
} | |
statement.addListener(CEPListener) | |
val statement2 = cepAdm.createEPL("""insert into StockTickFilterer select * from | |
StockTick(price>6.0)""") | |
val statement3=cepAdm.createEPL("select * from StockTickFilterer") | |
object CEPListener2 extends UpdateListener { | |
def update(newData:Array[EventBean], oldData:Array[EventBean] ) { | |
newData.foreach{row=> | |
println(s" Filtered Event received: ${row.getUnderlying()}") | |
} | |
} | |
} | |
statement3.addListener(CEPListener2) | |
val statement4=cepAdm.createEPL("select sum(price) as sum_price from StockTick.win:length_batch(2)") | |
statement4.addListener(new UpdateListener { | |
def update(newData:Array[EventBean], oldData:Array[EventBean] ) { | |
newData.foreach{row=> | |
println(s" Aggregate Event received: ${row.getUnderlying()}") | |
} | |
} | |
}) | |
val statement5=cepAdm.createEPL("select * from StockTick output every 3 events") | |
statement5.addListener(new UpdateListener { | |
def update(newData:Array[EventBean], oldData:Array[EventBean] ) { | |
newData.foreach{row=> | |
println(s" output controlled Event received: ${row.getUnderlying()}") | |
} | |
} | |
}) | |
cepRT.sendEvent(Tick("IBM",100)) | |
cepRT.sendEvent(Tick("XXX",200)) | |
for(i<-0 to 9) | |
generateRandomTick(cepRT) | |
Thread.sleep(2000) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sample output: | |
Sending tick:Tick(AAPL,7.0,1383842038818) | |
Event received: Tick(AAPL,7.0,1383842038818) | |
Filtered Event received: Tick(AAPL,7.0,1383842038818) | |
Sending tick:Tick(AAPL,9.0,1383842038828) | |
Event received: Tick(AAPL,9.0,1383842038828) | |
Filtered Event received: Tick(AAPL,9.0,1383842038828) | |
Sending tick:Tick(AAPL,8.0,1383842038829) | |
Event received: Tick(AAPL,8.0,1383842038829) | |
Filtered Event received: Tick(AAPL,8.0,1383842038829) | |
Sending tick:Tick(AAPL,4.0,1383842038829) | |
Sending tick:Tick(AAPL,6.0,1383842038829) | |
Sending tick:Tick(AAPL,2.0,1383842038829) | |
Sending tick:Tick(AAPL,3.0,1383842038829) | |
Sending tick:Tick(AAPL,7.0,1383842038829) | |
Filtered Event received: Tick(AAPL,7.0,1383842038829) | |
Sending tick:Tick(AAPL,0.0,1383842038829) | |
Sending tick:Tick(AAPL,8.0,1383842038829) | |
Filtered Event received: Tick(AAPL,8.0,1383842038829) | |
Sending tick:Tick(AAPL,0.0,1383842038830) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment