Skip to content

Instantly share code, notes, and snippets.

@smaldini
Last active August 29, 2015 14:11
Show Gist options
  • Save smaldini/cf46ddde2d7459551d34 to your computer and use it in GitHub Desktop.
Save smaldini/cf46ddde2d7459551d34 to your computer and use it in GitHub Desktop.
package reactor.rx;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.AbstractReactorTest;
import reactor.fn.tuple.Tuple;
import reactor.io.IOStreams;
import reactor.rx.stream.MapStream;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import static java.util.concurrent.TimeUnit.SECONDS;
import static reactor.Environment.cachedDispatcher;
import static reactor.Environment.sharedDispatcher;
/**
* @author Stephane Maldini
*/
public class PopularTagTests extends AbstractReactorTest {
private static final Logger LOG = LoggerFactory.getLogger(PopularTagTests.class);
private static final List<String> PULP_SAMPLE = Arrays.asList(
"Look, ", "just because I don't be givin' no man a #foot massage don't make it right for #Marsellus #to throw " +
"Antwone",
" ",
"into a glass #motherfucker house, ", "fuckin' up the way the nigger talks. ", "#Motherfucker do that shit #to" +
" " +
"me,", " he "
, "better paralyze my ass, ", "'cause I'll kill the #motherfucker , ", "know what I'm sayin'?"
);
@Test
public void sampleTest() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
// a Java Chronicle backed persistent map that auto delete on JVM shutdown for testing purposes
// each update on the map will trigger subscriber signals
MapStream<String, Integer> persistentMap = IOStreams.persistentMap("popularTags", true);
Controls top10every1second =
Streams.from(PULP_SAMPLE)
.dispatchOn(sharedDispatcher())
.flatMap(samuelJackson ->
Streams
.from(samuelJackson.split(" "))
.dispatchOn(cachedDispatcher())
.filter(w -> !w.trim().isEmpty())
.observe(i -> simulateLatency())
)
.map(w -> Tuple.of(w, 1))
.window(1, SECONDS)
.flatMap(s ->
BiStreams
.reduceByKey(s, persistentMap, (acc, next) -> acc + next)
.sort((a, b) -> -a.t2.compareTo(b.t2))
.take(10)
.finallyDo(_s -> LOG.info("------------------------ window complete! ----------------------"))
)
.consume(
entry -> LOG.info(entry.t1 + ": " + entry.t2),
error -> LOG.error("", error),
nil -> latch.countDown()
);
awaitLatch(top10every1second, latch);
}
private void simulateLatency(){
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@SuppressWarnings("unchecked")
private void awaitLatch(Controls tail, CountDownLatch latch) throws Exception {
if (!latch.await(10, SECONDS)) {
throw new Exception("Never completed: (" + latch.getCount() + ") "
+ tail.debug());
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment