Skip to content

Instantly share code, notes, and snippets.

@tzolov
Last active December 16, 2015 11:39
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save tzolov/5429016 to your computer and use it in GitHub Desktop.
Save tzolov/5429016 to your computer and use it in GitHub Desktop.
Example ElasticSerach Crunch Integration (using ElasticSearch-Hadoop) Application reads a sample 'twitter' index from ES, counts the messages per user and writes the result counts to different ES index. This example uses {@link Map} as a mean to represent the data read-from and written-to ES. Prerequisite: latest Crunch:0.6.0-SNAPSHOT and latest…
/*
* Copyright 2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.tzolov.es;
import java.io.Serializable;
import java.util.Map;
import org.apache.crunch.MapFn;
import org.apache.crunch.Pair;
import org.apache.crunch.Pipeline;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.types.writable.Writables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.elasticsearch.hadoop.crunch.ESTarget;
import org.elasticsearch.hadoop.crunch.ESTypedSource;
import org.elasticsearch.hadoop.crunch.ESTypes;
import com.google.common.collect.Maps;
/**
* ElasticSerach-Hadoop (via Crunch) Integration.
*
* Application reads a 'twitter' index from ES, counts the messages per user and writes the result counts to different ES index.
*
* This example uses {@link Map} as a mean to represent the data read-from and written-to ES.
*
* Prerequisite: latest Crunch:0.6.0-SNAPSHOT and latest ES-Hadoop.
*
* Run: hadoop jar target/es-hadoop-integration-1.0-SNAPSHOT-job.jar <ES server name> <ES server port>
*
* (Christian Tzolov) tzolov@apache.org
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public class ESCrunch extends Configured implements Tool, Serializable {
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new ESCrunch(), args);
}
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: hadoop jar es-hadoop-integration-1.0-SNAPSHOT-job.jar [generic options] esHost esPort \n");
GenericOptionsParser.printGenericCommandUsage(System.err);
return 1;
}
String esHost = args[0].trim();
Integer esPort = Integer.valueOf(args[1].trim());
Pipeline pipeline = new MRPipeline(ESCrunch.class, getConf());
ESTypedSource<Map> esSource = new ESTypedSource.Builder<Map>("twitter/tweet/_search?q=user:*", Map.class).setHost(esHost).setPort(esPort).build();
ESTarget esTarget = new ESTarget.Builder("statistics/count").setHost(esHost).setPort(esPort).build();
pipeline
.read(esSource) // query twitter index from ES
.parallelDo("ToUserName", new ExtractField("user"), Writables.strings()) // extract the user names
.count() // compute messages per user
.parallelDo("ToJavaMap", new ConvertToJavaMap(), ESTypes.map()) // convert counts into java Map
.write(esTarget); // create or update the ES 'statistics' index
return pipeline.done().succeeded() ? 0 : 1;
}
static class ExtractField extends MapFn<Map, String> {
private String fieldName;
public ExtractField(String fieldName) {
this.fieldName = fieldName;
}
public String map(Map tweet) {
return tweet.get(fieldName).toString();
}
}
static class ConvertToJavaMap extends MapFn<Pair<String, Long>, Map> {
private Map map = Maps.newHashMap();
public Map map(Pair<String, Long> messageCount) {
map.clear();
map.put("userName", messageCount.first());
map.put("tweetCount", messageCount.second());
return map;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment