Skip to content

Instantly share code, notes, and snippets.

@spullara
Last active December 14, 2015 01:59
Show Gist options
  • Save spullara/5010077 to your computer and use it in GitHub Desktop.
Save spullara/5010077 to your computer and use it in GitHub Desktop.
Experiments with JDK 1.8 streaming operations
package twitterprocessor;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.MappingJsonFactory;
import com.sampullara.cli.Args;
import com.sampullara.cli.Argument;
import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import java.util.stream.FlatMapper;
import java.util.stream.Stream;
import java.util.zip.GZIPInputStream;
import static java.util.stream.ConcurrentCollectors.groupingReduce;
import static twitterprocessor.Util.squelch;
/**
* Read and process a feed of compressed tweets
*/
public class App {
@Argument(alias = "f", description = "File or s3 url", required = true)
private static String file;
@Argument(alias = "a", description = "AWS properties file specifying accessKey and secretKey")
private static File auth;
public static void main(String[] args) throws IOException, URISyntaxException {
try {
Args.parse(App.class, args);
} catch (IllegalArgumentException e) {
System.err.println(e.getMessage());
Args.usage(App.class);
System.exit(1);
}
MappingJsonFactory jf = new MappingJsonFactory();
FlatMapper<String, JsonNode> lineToJson = squelch((line, consumer) -> consumer.accept(jf.createParser(line).readValueAsTree()));
Pattern noProtocol = Pattern.compile("^[A-Za-z0-9-]+[.]");
FlatMapper<String, URL> toURL = squelch((link, consumer) -> {
if (noProtocol.matcher(link).find()) {
consumer.accept(new URL("http://" + link));
} else {
consumer.accept(new URL(link));
}
});
FlatMapper<JsonNode, String> toLinks = (tweet, consumer) -> {
JsonNode links = tweet.get("l");
if (links != null) {
links.forEach(link -> consumer.accept(link.textValue()));
}
};
long start = System.currentTimeMillis();
InputStream in;
if (file.startsWith("s3://")) {
if (auth == null) {
throw new IllegalArgumentException("You must specify an auth properties file for AWS");
} else {
Properties aws = new Properties();
aws.load(new FileInputStream(auth));
URI url = new URI(file);
GetObjectRequest get = new GetObjectRequest(url.getHost(), url.getPath().substring(1));
AmazonS3Client client = new AmazonS3Client(new BasicAWSCredentials(aws.getProperty("accessKey"), aws.getProperty("secretKey")));
S3Object object = client.getObject(get);
in = object.getObjectContent();
}
} else {
in = new FileInputStream(file);
}
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new GZIPInputStream(in), "UTF-8"));
Stream<String> lines = bufferedReader.lines();
AtomicInteger tweets = new AtomicInteger();
AtomicInteger links = new AtomicInteger();
ConcurrentMap<String, Integer> map = lines
.parallel()
.peek(a -> {
if (tweets.incrementAndGet() % 100000 == 0) {
System.out.println("Processed " + tweets);
}
})
.flatMap(lineToJson)
.flatMap(toLinks)
.peek(a -> links.incrementAndGet())
.flatMap(toURL)
.collectUnordered(groupingReduce(URL::getHost, ConcurrentHashMap::new, u -> 1, Integer::sum));
List<Map.Entry<String, Integer>> entries = new ArrayList<>(map.entrySet());
entries.sort((e1, e2) -> e2.getValue() - e1.getValue());
entries.stream().filter(entry -> entry.getValue() >= 10).forEach(entry -> {
System.out.println(entry.getKey() + "," + entry.getValue());
});
System.out.println(entries.size() + " unique domains in " + links + " links from " + tweets + " tweets");
System.out.println(System.currentTimeMillis() - start);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment