Skip to content

Instantly share code, notes, and snippets.

@smaldini
Last active August 29, 2015 14:17
Show Gist options
  • Save smaldini/df93cc613b57b8610356 to your computer and use it in GitHub Desktop.
Save smaldini/df93cc613b57b8610356 to your computer and use it in GitHub Desktop.
Another Http Reactor with hot stream server-side
@Test
public void test5() throws Exception {
//Hot stream of data, could be injected from anywhere
Broadcaster<String> broadcaster = Broadcaster.<String>create(Environment.sharedDispatcher());
//Will go up to 16 parallel threads to proceed clients
final int MAX_PARALLEL = 16;
//Get a reference to the tail of the operation pipeline (microbatching + partitioning)
Stream<Stream<List<String>>> tail = broadcaster
//transform 10 data in a [] of 10 elements or wait up to 1 Second before emitting whatever the list contains
.buffer(10, 1, TimeUnit.SECONDS)
//create N substreams passed each time a new one is returned to the next operation
.groupBy(new Function<List<String>, Integer>(){
int i = 0;
//the partition strategy will iterate over an index i and modulate to get a number between 0..15
//that's 15 possible new substream, we need at least an incoming array to proceed to a new stream (or reuse the previously created for the same index)
public Integer apply(List<String> data){
return i++ % MAX_PARALLEL;
}
})
//make sure the generated substreams are dispatched on different dispatchers (resulting in different thread)
.map(partition -> partition.dispatchOn(Environment.cachedDispatcher()));
//create a server dispatching data on the default shared dispatcher, and serializing/deserializing as string
HttpServer<String, String> httpServer = NetStreams.httpServer(server -> server
.codec(StandardCodecs.STRING_CODEC)
.listen(8080)
.dispatcher(Environment.sharedDispatcher()));
//Listen for anything exactly hitting the root URI and route the incoming connection request to the callback
httpServer.get("/", (request) -> {
//prepare a response header to be appended first before any reply
request.addResponseHeader("X-CUSTOM", "12345");
//attach to the shared tail, take the most recent generated substream and merge it to the high level stream
//returning a stream of String from each microbatch merged
return tail
.take(1)
//make the partition the actual stream returned
.flatMap(partition -> partition)
//split each microbatch data into individual data
.flatMap(microbatch -> Streams.from(microbatch));
});
httpServer.start().awaitSuccess();
for (int i = 0; i < 30; i++) {
Thread.sleep(1000);
broadcaster.onNext(System.currentTimeMillis() + "\n" );
}
Thread.sleep(30000);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment