Skip to content

Instantly share code, notes, and snippets.

@fornarat
Created February 9, 2017 12:53
Show Gist options
  • Save fornarat/d72623b9b052d7de0f9f0140aeae2f5e to your computer and use it in GitHub Desktop.
Save fornarat/d72623b9b052d7de0f9f0140aeae2f5e to your computer and use it in GitHub Desktop.
package com.blah.akka;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.http.javadsl.Http;
import akka.http.javadsl.model.HttpRequest;
import akka.http.javadsl.model.HttpResponse;
import akka.japi.Pair;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.*;
import akka.util.ByteString;
import com.google.common.collect.Lists;
import com.hyperiongray.ferret.framework.service.utils.HttpParser;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import scala.util.Try;
import java.net.URI;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* Created by tomas on 9/02/17.
*/
public class AkkaStreamPageTest {
static class Page{
public Page(URI uri, String internalState) {
this.uri = uri;
this.internalState = internalState;
}
private URI uri;
private String internalState;
public URI getUri() {
return uri;
}
public String getInternalState() {
return internalState;
}
}
public static void main(String[] args) {
List<String> urls = Lists.newArrayList();
urls.add("http://akka.io/docs/");
urls.add("http://akka.io/downloads/");
urls.add("http://akka.io/get-involved/");
urls.add("http://doc.akka.io/docs/akka/2.4.16/java/stream/stream-customize.html/");
final ActorSystem actorSystem = ActorSystem.create();
final ActorMaterializer materializer = ActorMaterializer.create(actorSystem);
final Flow<Pair<HttpRequest, Integer>, Pair<Try<HttpResponse>, Integer>, NotUsed> requestClientFlow = Http.get(actorSystem).<Integer>superPool(materializer);
final Flow<ByteString, ByteString, NotUsed> framing = Framing.delimiter(ByteString.fromString("\n"), 256 * 1024, FramingTruncation.ALLOW);
final Function<ByteString, String> transformEachLine = line -> line.decodeString("US-ASCII") /* some transformation here */;
Source.from(urls)
.map(url -> new Page(URI.create(url), "blahblah" + System.currentTimeMillis())) // on the page object, I store an URI and an InternalState (using just a string for this example, but I intent to have a complex object)
.map(s -> Pair.create(HttpRequest.create(s.getUri().toASCIIString()), 42))
.via(requestClientFlow)
.map(o -> o.first())
.filter(p -> p.isSuccess())
.map(x -> x.get().entity().getDataBytes() //TODO At this point, I want to have an (immutable) reference to page.getInternalState()!
.via(framing)
.map(transformEachLine::apply)
.map(content -> {
Document doc = Jsoup.parse(content);
Set<URI> links = HttpParser.getLinksNew(URI.create("http://akka.io/"), doc);
return links; //TODO return Map<InternalState, Set<Links>>
})
.filter(uris -> uris != null && uris.size() > 0)
.mapConcat(i -> i)
.to(Sink.foreach(System.out::println))
.run(materializer)
)
.to(Sink.ignore())
.run(materializer);
}
private static Set<URI> getLinksNew(URI hostUri, Document doc) {
return
doc.select("a[href]")
.stream()
.map(htmlLink -> htmlLink.attr("href"))
.filter(strLink -> strLink != null)
.filter(strLink -> !strLink.trim().isEmpty())
.filter(strLink -> !strLink.startsWith("#"))
.filter(strLink -> !strLink.startsWith("mailto"))
.map(strLink ->{
URI uri=null;
try{
uri = URI.create(strLink);
if(!uri.isAbsolute())
uri = hostUri.resolve(strLink);
}
catch (IllegalArgumentException e){
// LOGGER.debug(strLink, e);
}
return uri;
})
.filter(uri -> uri != null)
.collect(Collectors.toSet());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment