Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save confluentgist/054002e350b0e0a1e12ddb68e9c5aae3 to your computer and use it in GitHub Desktop.
Save confluentgist/054002e350b0e0a1e12ddb68e9c5aae3 to your computer and use it in GitHub Desktop.
Testing Your Suppressions
driver.pipeInput(recordFactory.create(
/* topic */ "input",
/* key */ "A",
/* value */ "v1",
/* timestamp */ 10L
));
// Stream time is now 10L
driver.pipeInput(recordFactory.create("input", "A", "v2", 11L));
// Stream time is now 11L
driver.pipeInput(recordFactory.create("input", "A", "v3", 12L));
// Stream time is now 12L
// To keep up with the example, we advance stream time to 13
// by sending a record with a dummy key
driver.pipeInput(recordFactory.create("input", "foo", "anything", 13L));
// Stream time is now 13L
driver.pipeInput(recordFactory.create("input", "A", "v4", 11L));
// Stream time is still 13L
// If you check driver.readOutput any time up to and including now, it
// returns null, since suppress won't emit anything until the window closes,
// which happens at time 14
driver.pipeInput(recordFactory.create("input", "foo", "anything", 14L));
// Stream time is now 14L
// Now, driver.readOutput returns the count of events for "A" in the
// window starting at time 10, which is 3.
// this event gets dropped because its window is now closed :(
driver.pipeInput(recordFactory.create("input", "A", "v5", 10L));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment