Skip to content

Instantly share code, notes, and snippets.

@mtopolnik
Created June 13, 2019 20:33
Show Gist options
  • Save mtopolnik/84f788599827a4e63c7ae98ea905534e to your computer and use it in GitHub Desktop.
Save mtopolnik/84f788599827a4e63c7ae98ea905534e to your computer and use it in GitHub Desktop.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.hazelcast.jet.prototype</groupId>
<artifactId>prototype</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.hazelcast.jet</groupId>
<artifactId>hazelcast-jet</artifactId>
<version>3.0</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
</dependencies>
<properties>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
</properties>
</project>
package testing;
import com.hazelcast.jet.Jet;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.accumulator.LongLongAccumulator;
import com.hazelcast.jet.aggregate.AggregateOperation;
import com.hazelcast.jet.datamodel.KeyedWindowResult;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.SourceBuilder;
import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.jet.pipeline.StreamStage;
import java.io.Serializable;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import static com.hazelcast.jet.aggregate.AggregateOperations.counting;
import static com.hazelcast.jet.pipeline.WindowDefinition.sliding;
public class Prototype {
public static void main(String[] args) {
Pipeline p = Pipeline.create();
StreamStage<Auth> input = p.drawFrom(AuthSource.authSource()).withNativeTimestamps(10);
input.groupingKey(Auth::getAccountId)
.window(sliding(TimeUnit.SECONDS.toMillis(1), 10))
.aggregate(counting())
.filter(e -> e.getValue() > 1)
.drainTo(Sinks.logger(kwr -> "alarm: accountId = " + kwr.key() + ", count = " + kwr.result()));
input.groupingKey(Auth::getAccountId)
.window(sliding(TimeUnit.DAYS.toMillis(1), TimeUnit.SECONDS.toMillis(1)))
.aggregate(counting())
.groupingKey(KeyedWindowResult::key)
.rollingAggregate(AggregateOperation
.withCreate(LongLongAccumulator::new)
.<KeyedWindowResult<Long, Long>>andAccumulate((acc, kwr) -> {
acc.set1(acc.get2());
acc.set2(kwr.result());
})
.andExportFinish(acc -> acc.get1() == acc.get2() ? null : acc.get2()))
.filter(e -> e.getValue() != null)
.drainTo(Sinks.logger(e -> "slow result: accountId = " + e.getKey() + ", count = " + e.getValue()));
JetInstance jet = Jet.newJetInstance();
jet.newJob(p);
}
}
class Auth implements Serializable {
long timestamp;
long accountId;
long amount;
String merchant;
public Auth(long timestamp, long accountId, long amount, String merchant) {
this.timestamp = timestamp;
this.accountId = accountId;
this.amount = amount;
this.merchant = merchant;
}
public long getTimestamp() {
return timestamp;
}
public long getAccountId() {
return accountId;
}
public long getAmount() {
return amount;
}
public String getMerchant() {
return merchant;
}
}
class AuthSource {
private static final String[] merchants = {
"Target", "Walmart", "Khols"
};
private static final int NUM_ACCOUNTS = 50;
private static final int MAX_AUTH_AMOUNT = 1000;
public static StreamSource<Auth> authSource() {
return SourceBuilder.timestampedStream("trade-source", x -> new AuthGenerator(100))
.fillBufferFn(AuthGenerator::fillBuffer)
.build();
}
private static class AuthGenerator {
private final int tps;
AuthGenerator(int tps) {
this.tps = tps;
}
void fillBuffer(SourceBuilder.TimestampedSourceBuffer<Auth> buffer) {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
for (int i = 0; i < tps; i++) {
long timestamp = System.currentTimeMillis();
Auth auth = new Auth(timestamp, rnd.nextLong(NUM_ACCOUNTS), rnd.nextLong(MAX_AUTH_AMOUNT),
merchants[rnd.nextInt(merchants.length)]);
buffer.add(auth, timestamp);
}
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1)); // sleep for 1 second
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment