Skip to content

Instantly share code, notes, and snippets.

@pron
Last active May 20, 2020 15:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pron/6b2b5c3a60047ed668d6a85a24eb0c6d to your computer and use it in GitHub Desktop.
Save pron/6b2b5c3a60047ed668d6a85a24eb0c6d to your computer and use it in GitHub Desktop.
// Using a hypothetical channel API, I assume I have channel `in` with a 4096-wide buffer,
// and a channel `out` writing to Kafka.
// lookupNodeInRedis and processOnServer have semaphores of, say, 16 and 8, to *globally* limit
// concurrent use of those resources.
HighLevelLib.processInOrder(50, // how many processors
in,
HighLevelLib.batchEvery(Duration.ofSeconds(1), out),
msg ->
try {
var uri = lookupNodeInRedis(msg.record().value()); // blocking
var uriDataOffset = new UriDataOffset(uri, msg.record().value(), msg.committableOffset());
processOnServer(uriDataOffset.uri, uriDataOffset.data); // blocking
return uriDataOffset.offset;
} catch (Exception e) { ... });
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment