Last active
August 29, 2015 14:05
-
-
Save davidski/4e5ed5a32d4b353a08f6 to your computer and use it in GitHub Desktop.
Sample ES PIG Query
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
- Use boostrap script from https://gist.github.com/tnbredillet/867111b8e1e600fa588e | |
- Use AMI 3.0.4 (3.1.x AMIs don't seem to work) | |
- Ensure use of public subnet for access to Internet (S3 & code download) | |
- Ensure that the EMR security groups have access to the ES ELB | |
- bootstrap-action s3://us-east-1.elasticmapreduce/bootstrap-actions/configure-hadoop --args "-e,fs.s3.enableServerSideEncryption=true" | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/*es_sample.pig | |
Silly little proof of concept query to pull ES data into Apache Pig | |
*/ | |
REGISTER s3://log-inbox.elk.sch/bin/elasticsearch-hadoop-2.1.0.Beta1.jar; | |
/* | |
Either use the full class name (ugh) or use a define will _all_ parameters | |
specified. You can't parameterize DEFINES in pig. (More ugh) | |
*/ | |
DEFINE EsStorage org.elasticsearch.hadoop.pig.EsStorage('es.nodes=internal-elasticsearch-lb-532683521.us-west-2.elb.amazonaws.com', 'es.query = ?q=ntp'); | |
-- grab all the results, filter to the first 10, and dump to console | |
A = LOAD '_all/sidewinder' | |
USING EsStorage(); | |
B = LIMIT A 10; | |
DUMP B; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark import SparkContext, SparkConf | |
val conf = new SparkConf().setAppName(appName).setMaster(master) | |
new SparkContext(conf) | |
from pyspark.sql import SQLContext | |
sqlCtx = SQLContext(sc) | |
conf = {"es.resource" : "index/type"} # assume Elasticsearch is running on localhost defaults | |
rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat", "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf) | |
rdd.first() # the result is a MapWritable that is converted to a Python dict | |
(u'Elasticsearch ID', | |
{u'field1': True, | |
u'field2': u'Some Text', | |
u'field3': 12345}) | |
dicts = rdd.map(lambda x: x[1]) | |
table = sqlCtx.inferSchema(dicts) | |
table.registerAsTable("table") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
pattern = re.compile("(?P<item>.)foobar(?<srcip>\d+.\d+.\d+.\d+)") | |
matchobj = pattern.match(line) | |
print "Found elements {} and {}".format(matchobj.group(1), matchobj.group(0)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
SPARK_MEM="2g" /home/hadoop/spark/bin/spark-shell | |
val file = sc.textFile("s3://bigdatademo/sample/wiki/") | |
val reducedList = file.map(l => l.split(" ")).map(l => (l(1), l(2).toInt)).reduceByKey(_+_, 3) | |
reducedList.cache | |
val sortedList = reducedList.map(x => (x._2, x._1)).sortByKey(false).take(50) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment