Skip to content

Instantly share code, notes, and snippets.

@IvanFernandez
Last active October 10, 2016 17:19
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save IvanFernandez/b3a3e25397f8b402256b to your computer and use it in GitHub Desktop.
Save IvanFernandez/b3a3e25397f8b402256b to your computer and use it in GitHub Desktop.
Gist to persist from Spark to ElasticSearch
String mode = args[0];
final String collection = args[1];
SparkConf sparkConf = new SparkConf();
sparkConf.set("es.nodes", args[2]);
sparkConf.setAppName("Spark to ElasticSearch PoC");
sparkConf.setMaster(mode);
// Twitter configuration
ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setDebugEnabled(true)
.setOAuthConsumerKey(getProperties("consumer.key"));
.setOAuthConsumerSecret(getProperties("consumer.secret"));
.setOAuthAccessToken(getProperties("access.token"));
.setOAuthAccessTokenSecret(getProperties("secret.token"));
TwitterFactory tf = new TwitterFactory(cb.build());
Twitter twitter = tf.getInstance();
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,new Duration(1000));
JavaDStream<Status> tweets = TwitterUtils.createStream(jssc, twitter.getAuthorization());
JavaDStream<Map<String, String>> statuses = tweets
.map(new Function<Status, Map<String, String>>() {
public Map<String, String> call(Status status) {
Map<String, String> tweet = tweetConverter(status);
return tweet;
}
});
statuses.foreach(new Function<JavaRDD<Map<String, String>>, Void>() {
private static final long serialVersionUID = 6272424972267329328L;
@Override
public Void call(JavaRDD<Map<String, String>> rdd) throws Exception {
JavaEsSpark.saveToEs(rdd, "test/" + collection);
return (Void) null;
}
});
jssc.start();
jssc.awaitTermination(20000);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment