Skip to content

Instantly share code, notes, and snippets.

@fbiville
Last active August 29, 2015 14:27
Show Gist options
  • Save fbiville/263518f5f5517297ae83 to your computer and use it in GitHub Desktop.
Save fbiville/263518f5f5517297ae83 to your computer and use it in GitHub Desktop.
A little fun with Reactor [Reactor Environment has been initialized beforehand]
package fr.vidal.oss.crawler;
import fr.vidal.oss.crawler.cache.UrlCache;
import fr.vidal.oss.crawler.model.LinkedFeed;
import fr.vidal.oss.crawler.parsing.Either;
import fr.vidal.oss.crawler.parsing.FetchError;
import fr.vidal.oss.jaxb.atom.core.Link;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.fn.Predicate;
import reactor.rx.broadcast.Broadcaster;
import java.util.function.Function;
import java.util.stream.Stream;
public class Crawler {
private final UrlCache cache;
private final Function<Link, Either<FetchError, LinkedFeed>> linkConsumer;
private final Function<LinkedFeed, Stream<Link>> feedParser;
private final Broadcaster<Either<FetchError, LinkedFeed>> apiResponseSink;
private final Broadcaster<Link> linkSink;
private final Dispatcher dispatcher;
public Crawler(UrlCache cache,
Function<Link, Either<FetchError, LinkedFeed>> linkConsumer,
Function<LinkedFeed, Stream<Link>> feedParser,
Broadcaster<Either<FetchError, LinkedFeed>> apiResponseSink,
Broadcaster<Link> linkSink) {
this.dispatcher = Environment.cachedDispatcher();
this.cache = cache;
this.linkConsumer = linkConsumer;
this.feedParser = feedParser;
this.apiResponseSink = apiResponseSink;
this.linkSink = linkSink;
configureSinks();
}
public void crawl(Link link) {
linkSink.onNext(link);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e.getMessage(), e);
}
}
private final void configureSinks() {
linkSink.dispatchOn(dispatcher)
.filter(isCached())
.map(linkConsumer::apply)
.consume(apiResponseSink::onNext);
apiResponseSink.dispatchOn(dispatcher)
.filter(Either::isRight)
.map(Either::getRight)
.map(cacheAndReturn())
.map(feedParser::apply)
.consume(links -> links.forEach(linkSink::onNext));
}
private final Predicate<Link> isCached() {
return link -> !cache.isInCache(link.getHref());
}
private final reactor.fn.Function<LinkedFeed, LinkedFeed> cacheAndReturn() {
return linkedFeed -> {
cache.storeInCache(linkedFeed.getHref());
return linkedFeed;
};
}
}
@fbiville
Copy link
Author

While testing this against http://api.vidal.fr, it seems to stop after 3 HTTP requests... 😞

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment