# Get all the top-5 charts across all instances
$ http://localhost:7070/kafka-music/song/9
{
"artist":"N.W.A",
"album":"Straight Outta Compton",
"name":"Gangsta Gangsta"
// Get the window store named "queryStoreName" | |
ReadOnlyWindowStore<String, Long> windowStore = | |
streams.store("queryStoreName", QueryableStoreTypes.windowStore()); | |
// Fetch values for the key "europe" for all of the windows available in this application instance. | |
// To get *all* available windows we fetch windows from the beginning of time until now. | |
long timeFrom = 0; // beginning of time = oldest available | |
long timeTo = System.currentTimeMillis(); // now (in processing-time) | |
WindowStoreIterator<Long> iterator = windowStore.fetch("europe", timeFrom, timeTo); | |
while (iterator.hasNext()) { |
KTable<Windowed<String>, Long> aggregated = inputStream | |
.groupByKey() | |
.reduce((aggValue, newValue) -> aggValue + newValue, | |
TimeWindows.of(TimeUnit.MINUTES.toMillis(2)) | |
.until(TimeUnit.DAYS.toMillis(1) /* keep for one day */), | |
"queryStoreName"); |
PCollection<KV<String, Integer>> scores = input | |
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(2))) | |
.triggering( | |
AtWatermark() | |
.withEarlyFirings(AtPeriod(Duration.standardMinutes(1))) | |
.withLateFirings(AtCount(1))) | |
.accumulatingAndRetractingFiredPanes()) | |
.apply(Sum.integersPerKey()) |
# Get all the top-5 charts across all instances
$ http://localhost:7070/kafka-music/song/9
{
"artist":"N.W.A",
"album":"Straight Outta Compton",
"name":"Gangsta Gangsta"
# Get all the top-5 charts across all instances
$ http://localhost:7070/kafka-music/charts/genre/punk
[
{
"artist":"Wheres The Pope?",
"album":"PSI",
# Get all the top-5 charts across all instances
$ http://localhost:7070/kafka-music/charts/top-five
[
{
"artist":"Hilltop Hoods",
"album":"The Calling",
# List running app instances that currently manage (parts of) state store "top-five-songs"
$ http://localhost:7070/kafka-music/instances/top-five-songs
[
{
"host": "localhost",
"port": 7070,
# List all running instances of this application
$ http://localhost:7070/kafka-music/instances
[
{
"host": "localhost",
"port": 7070,