Skip to content

Instantly share code, notes, and snippets.

@pron
Last active May 22, 2020 05:42
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pron/e492d6f69a224c3c6dd7edaa07a54245 to your computer and use it in GitHub Desktop.
Save pron/e492d6f69a224c3c6dd7edaa07a54245 to your computer and use it in GitHub Desktop.
// Using a hypothetical channel API, I assume I have channel `in` with a 4096-wide buffer,
// and a channel `out` writing to Kafka.
// lookupNodeInRedis and processOnServer have semaphores of, say, 16 and 8, to *globally* limit
// concurrent use of those resources.
var processingQueue = new BufferedChannel(50); // this buffer bounds the number of processing threads
// Consume and process input
Thread.startVirtualThread(() -> {
try (var e = Executors.newUnboundedVirtualThreadExecutor()) {
while (true) {
var msg = in.receive(); // for each incoming message we start a processing thread
var future = e.submit(() -> {
try {
var uri = lookupNodeInRedis(msg.record().value()); // blocking
var uriDataOffset = new UriDataOffset(uri, msg.record().value(), msg.committableOffset());
processOnServer(uriDataOffset.uri, uriDataOffset.data); // blocking
return uriDataOffset.offset;
} catch (Exception e) { ... }
});
processingQueue.send(future); // we block here if there are too many processing threads
}
}
});
// Collect and batch output
Thread.startVirtualThread(() -> {
while (true) {
var batch = new ArrayList<Result>();
try (var e = Executors.newUnboundedVirtualThreadExecutor()
.withDeadline(Instant.now().plusSeconds(1))) {
e.submit(() ->
while (true) {
window.add(processingQueue.map(Future::get).receive()); // WARNING: map must be atomic
}
});
} catch (Exception e) { ... }
out.send(batch);
}
}).join();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment