Skip to content

Instantly share code, notes, and snippets.

@davidski
Last active August 29, 2015 14:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save davidski/4e5ed5a32d4b353a08f6 to your computer and use it in GitHub Desktop.
Save davidski/4e5ed5a32d4b353a08f6 to your computer and use it in GitHub Desktop.
Sample ES PIG Query
- Use boostrap script from https://gist.github.com/tnbredillet/867111b8e1e600fa588e
- Use AMI 3.0.4 (3.1.x AMIs don't seem to work)
- Ensure use of public subnet for access to Internet (S3 & code download)
- Ensure that the EMR security groups have access to the ES ELB
- bootstrap-action s3://us-east-1.elasticmapreduce/bootstrap-actions/configure-hadoop --args "-e,fs.s3.enableServerSideEncryption=true"
/*es_sample.pig
Silly little proof of concept query to pull ES data into Apache Pig
*/
REGISTER s3://log-inbox.elk.sch/bin/elasticsearch-hadoop-2.1.0.Beta1.jar;
/*
Either use the full class name (ugh) or use a define will _all_ parameters
specified. You can't parameterize DEFINES in pig. (More ugh)
*/
DEFINE EsStorage org.elasticsearch.hadoop.pig.EsStorage('es.nodes=internal-elasticsearch-lb-532683521.us-west-2.elb.amazonaws.com', 'es.query = ?q=ntp');
-- grab all the results, filter to the first 10, and dump to console
A = LOAD '_all/sidewinder'
USING EsStorage();
B = LIMIT A 10;
DUMP B;
from pyspark import SparkContext, SparkConf
val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)
from pyspark.sql import SQLContext
sqlCtx = SQLContext(sc)
conf = {"es.resource" : "index/type"} # assume Elasticsearch is running on localhost defaults
rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat", "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf)
rdd.first() # the result is a MapWritable that is converted to a Python dict
(u'Elasticsearch ID',
{u'field1': True,
u'field2': u'Some Text',
u'field3': 12345})
dicts = rdd.map(lambda x: x[1])
table = sqlCtx.inferSchema(dicts)
table.registerAsTable("table")
import re
pattern = re.compile("(?P<item>.)foobar(?<srcip>\d+.\d+.\d+.\d+)")
matchobj = pattern.match(line)
print "Found elements {} and {}".format(matchobj.group(1), matchobj.group(0))
SPARK_MEM="2g" /home/hadoop/spark/bin/spark-shell
val file = sc.textFile("s3://bigdatademo/sample/wiki/")
val reducedList = file.map(l => l.split(" ")).map(l => (l(1), l(2).toInt)).reduceByKey(_+_, 3)
reducedList.cache
val sortedList = reducedList.map(x => (x._2, x._1)).sortByKey(false).take(50)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment