Skip to content

Instantly share code, notes, and snippets.

@charles-dyfis-net
Created September 13, 2012 17:05
Show Gist options
  • Save charles-dyfis-net/3715841 to your computer and use it in GitHub Desktop.
Save charles-dyfis-net/3715841 to your computer and use it in GitHub Desktop.
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>net.dyfis</groupId>
<artifactId>esper-testcase</artifactId>
<packaging>jar</packaging>
<version>0.0.1-SNAPSHOT</version>
<name>Esper bug reproducer</name>
<dependencies>
<dependency>
<groupId>com.espertech</groupId>
<artifactId>esper</artifactId>
<version>4.7.0</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
<!-- <showDeprecation>true</showDeprecation> -->
<skip>false</skip>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.1.1</version>
<executions>
<execution>
<phase>test</phase>
<goals>
<goal>java</goal>
</goals>
<configuration>
<mainClass>repro.RunningCounterTest</mainClass>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
package repro;
import com.espertech.esper.client.*;
import com.espertech.esper.event.map.MapEventBean;
import com.espertech.esper.client.time.CurrentTimeEvent;
import com.espertech.esper.client.time.TimerControlEvent;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
public class RunningCounterTest {
private long SECONDS = 1000;
EPServiceProvider esperProvider;
EPAdministrator esperAdmin;
EPRuntime esperRuntime;
ConfigurationOperations esperConfig;
private LoggingUpdateListener loggingUpdateListener;
private class LoggingUpdateListener implements StatementAwareUpdateListener {
@Override
public void update(EventBean[] newEvents, EventBean[] oldEvents, EPStatement statement, EPServiceProvider epServiceProvider) {
for(EventBean newEvent : newEvents) {
System.out.print("Triggered statement ");
System.out.print(statement.getName());
System.out.print(": ");
System.out.println(newEvent.toString());
if (newEvent instanceof MapEventBean) {
MapEventBean mapEvent = (MapEventBean) newEvent;
System.out.println(" Map with fields:");
Map<String, Object> props = mapEvent.getProperties();
for(Map.Entry<String, Object> entry : props.entrySet()) {
System.out.println(" " + entry.getKey() + " => " + entry.getValue().toString());
}
}
}
}
}
RunningCounterTest() {
loggingUpdateListener = new LoggingUpdateListener();
Configuration config = new Configuration();
config.getEngineDefaults().getThreading().setInternalTimerEnabled(false);
esperProvider = EPServiceProviderManager.getProvider("RunningCounterTest", config);
esperProvider.initialize();
esperAdmin = esperProvider.getEPAdministrator();
esperConfig = esperAdmin.getConfiguration();
esperRuntime = esperProvider.getEPRuntime();
esperRuntime.sendEvent(TimerControlEvent.ClockType.CLOCK_EXTERNAL);
setTime(0);
}
/* small but sufficient subset of the Clojure implementation */
void defineEventType(String event_name, String[] names, Object[] types, String[] superclasses) {
ConfigurationEventTypeMap extraConfig = new ConfigurationEventTypeMap();
HashMap<String, Object> typeMap = new HashMap<String, Object>();
for(int i=0; i < types.length; i++) {
typeMap.put(names[i], types[i]);
}
Set<String> supertypes = new HashSet<String>();
for(String superclass : superclasses) {
supertypes.add(superclass);
}
extraConfig.setSuperTypes(supertypes);
esperConfig.addEventType(event_name, typeMap, extraConfig);
}
void setTime(long msTime) {
esperRuntime.sendEvent(new CurrentTimeEvent(msTime));
}
void defineStatement(String statementName, String statementText) {
EPStatement oldQuery = esperAdmin.getStatement(statementName);
if(oldQuery != null) {
oldQuery.destroy();
}
EPStatement newQuery = esperAdmin.createEPL(statementText, statementName);
newQuery.addListener(loggingUpdateListener);
}
void insertEvent(String eventName, String[] eventProperties, Object[] eventValues) {
Map<String, Object> eventMap = new HashMap<String, Object>();
for(int i=0; i<eventProperties.length; i++) {
eventMap.put(eventProperties[i], eventValues[i]);
}
esperRuntime.sendEvent(eventMap, eventName);
}
void run() {
setTime(0);
defineEventType("KeyedEvent",
new String[] {"key"},
new Object[] {java.lang.String.class},
new String[] {});
defineEventType("InstanceEvent",
new String[] {"instance"},
new Object[] {java.lang.String.class},
new String[] {"KeyedEvent"});
defineEventType("GroupStart",
new String[] {},
new Object[] {},
new String[] {"KeyedEvent"});
defineEventType("ElementStart",
new String[] {},
new Object[] {},
new String[] {"InstanceEvent"});
defineEventType("ElementSuccess",
new String[] {},
new Object[] {},
new String[] {"InstanceEvent"});
defineEventType("ElementFail",
new String[] {},
new Object[] {},
new String[] {"InstanceEvent"});
defineStatement("running-counter",
"select " +
" group_start, count(inst_start), count(inst_success), count(inst_fail) " +
"from " +
" GroupStart.win:time(3 hours) group_start " +
" left outer join ElementStart.win:time(3 hours) inst_start " +
" on group_start.key = inst_start.key " +
" left outer join ElementSuccess.win:time(3 hours) inst_success " +
" on group_start.key = inst_success.key " +
" left outer join ElementFail.win:time(3 hours) inst_fail " +
" on group_start.key = inst_fail.key " +
"group by group_start.key");
setTime(10);
insertEvent("GroupStart",
new String[] {"key"},
new Object[] {"one"});
setTime(20);
insertEvent("ElementStart",
new String[] {"key", "instance"},
new Object[] {"one", "foo"});
setTime(30);
insertEvent("ElementStart",
new String[] {"key", "instance"},
new Object[] {"one", "bar"});
setTime(40);
insertEvent("ElementFail",
new String[] {"key", "instance"},
new Object[] {"one", "foo"});
setTime(50);
insertEvent("ElementSuccess",
new String[] {"key", "instance"},
new Object[] {"one", "bar"});
setTime(60);
}
public static void main(String[] args) {
RunningCounterTest instance = new RunningCounterTest();
instance.run();
}
}
/*
* vim: ai et sts=4 sw=4 ts=4
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment