Created
June 13, 2019 20:33
-
-
Save mtopolnik/84f788599827a4e63c7ae98ea905534e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8"?> | |
<project xmlns="http://maven.apache.org/POM/4.0.0" | |
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | |
<modelVersion>4.0.0</modelVersion> | |
<groupId>com.hazelcast.jet.prototype</groupId> | |
<artifactId>prototype</artifactId> | |
<version>1.0-SNAPSHOT</version> | |
<dependencies> | |
<dependency> | |
<groupId>com.hazelcast.jet</groupId> | |
<artifactId>hazelcast-jet</artifactId> | |
<version>3.0</version> | |
</dependency> | |
<dependency> | |
<groupId>log4j</groupId> | |
<artifactId>log4j</artifactId> | |
<version>1.2.17</version> | |
</dependency> | |
</dependencies> | |
<properties> | |
<maven.compiler.target>1.8</maven.compiler.target> | |
<maven.compiler.source>1.8</maven.compiler.source> | |
</properties> | |
</project> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package testing; | |
import com.hazelcast.jet.Jet; | |
import com.hazelcast.jet.JetInstance; | |
import com.hazelcast.jet.accumulator.LongLongAccumulator; | |
import com.hazelcast.jet.aggregate.AggregateOperation; | |
import com.hazelcast.jet.datamodel.KeyedWindowResult; | |
import com.hazelcast.jet.pipeline.Pipeline; | |
import com.hazelcast.jet.pipeline.Sinks; | |
import com.hazelcast.jet.pipeline.SourceBuilder; | |
import com.hazelcast.jet.pipeline.StreamSource; | |
import com.hazelcast.jet.pipeline.StreamStage; | |
import java.io.Serializable; | |
import java.util.concurrent.ThreadLocalRandom; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.locks.LockSupport; | |
import static com.hazelcast.jet.aggregate.AggregateOperations.counting; | |
import static com.hazelcast.jet.pipeline.WindowDefinition.sliding; | |
public class Prototype { | |
public static void main(String[] args) { | |
Pipeline p = Pipeline.create(); | |
StreamStage<Auth> input = p.drawFrom(AuthSource.authSource()).withNativeTimestamps(10); | |
input.groupingKey(Auth::getAccountId) | |
.window(sliding(TimeUnit.SECONDS.toMillis(1), 10)) | |
.aggregate(counting()) | |
.filter(e -> e.getValue() > 1) | |
.drainTo(Sinks.logger(kwr -> "alarm: accountId = " + kwr.key() + ", count = " + kwr.result())); | |
input.groupingKey(Auth::getAccountId) | |
.window(sliding(TimeUnit.DAYS.toMillis(1), TimeUnit.SECONDS.toMillis(1))) | |
.aggregate(counting()) | |
.groupingKey(KeyedWindowResult::key) | |
.rollingAggregate(AggregateOperation | |
.withCreate(LongLongAccumulator::new) | |
.<KeyedWindowResult<Long, Long>>andAccumulate((acc, kwr) -> { | |
acc.set1(acc.get2()); | |
acc.set2(kwr.result()); | |
}) | |
.andExportFinish(acc -> acc.get1() == acc.get2() ? null : acc.get2())) | |
.filter(e -> e.getValue() != null) | |
.drainTo(Sinks.logger(e -> "slow result: accountId = " + e.getKey() + ", count = " + e.getValue())); | |
JetInstance jet = Jet.newJetInstance(); | |
jet.newJob(p); | |
} | |
} | |
class Auth implements Serializable { | |
long timestamp; | |
long accountId; | |
long amount; | |
String merchant; | |
public Auth(long timestamp, long accountId, long amount, String merchant) { | |
this.timestamp = timestamp; | |
this.accountId = accountId; | |
this.amount = amount; | |
this.merchant = merchant; | |
} | |
public long getTimestamp() { | |
return timestamp; | |
} | |
public long getAccountId() { | |
return accountId; | |
} | |
public long getAmount() { | |
return amount; | |
} | |
public String getMerchant() { | |
return merchant; | |
} | |
} | |
class AuthSource { | |
private static final String[] merchants = { | |
"Target", "Walmart", "Khols" | |
}; | |
private static final int NUM_ACCOUNTS = 50; | |
private static final int MAX_AUTH_AMOUNT = 1000; | |
public static StreamSource<Auth> authSource() { | |
return SourceBuilder.timestampedStream("trade-source", x -> new AuthGenerator(100)) | |
.fillBufferFn(AuthGenerator::fillBuffer) | |
.build(); | |
} | |
private static class AuthGenerator { | |
private final int tps; | |
AuthGenerator(int tps) { | |
this.tps = tps; | |
} | |
void fillBuffer(SourceBuilder.TimestampedSourceBuffer<Auth> buffer) { | |
ThreadLocalRandom rnd = ThreadLocalRandom.current(); | |
for (int i = 0; i < tps; i++) { | |
long timestamp = System.currentTimeMillis(); | |
Auth auth = new Auth(timestamp, rnd.nextLong(NUM_ACCOUNTS), rnd.nextLong(MAX_AUTH_AMOUNT), | |
merchants[rnd.nextInt(merchants.length)]); | |
buffer.add(auth, timestamp); | |
} | |
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1)); // sleep for 1 second | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment