Skip to content

Instantly share code, notes, and snippets.

@jsnouffer
Last active August 13, 2020 04:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jsnouffer/7a8496dfb7dd7663742a6be554955e50 to your computer and use it in GitHub Desktop.
Save jsnouffer/7a8496dfb7dd7663742a6be554955e50 to your computer and use it in GitHub Desktop.
public static final double SOME_CONSTANT = ...;
// ... //
// define conditions for each branch (edge) of topology
Predicate<String, JsonObject> greaterThan = (String key, JsonObject value) -> {
double dValue = value.get("my_double_value").getAsDouble();
return dValue > SOME_CONSTANT;
};
Predicate<String, JsonObject> lessThan = (String key, JsonObject value) -> {
double dValue = value.get("my_double_value").getAsDouble();
return dValue < SOME_CONSTANT;
};
Predicate<String, JsonObject> equalTo = (String key, JsonObject value) -> {
double dValue = value.get("my_double_value").getAsDouble();
// epsilon is an arbitrarily small real number, such as 1e-15
return Math.abs(dValue - SOME_CONSTANT) < epsilon;
};
Predicate<String, JsonObject>[] conditions = new Predicate<>[] { greaterThan, lessThan, equalTo };
KStream<String, JsonElement>[] branches = inputStream.branch(conditions);
// define a output topic for each branch (edge)
branches[0].to("greater-than-topic");
branches[1].to("less-than-topic");
branches[2].to("equal-to-topic");
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment