Created
May 16, 2014 16:59
-
-
Save MLnick/bb7c6e87f5c53be2cce4 to your computer and use it in GitHub Desktop.
Elasticsearch / Spark / Shark setup
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Elasticsearch : "1.1.1" | |
Spark : "0.9.1-hadoop1" | |
Shark : "0.9.1-hadoop1" | |
elasticsearch-hadoop-hive : "elasticsearch-hadoop-hive-2.0.0.RC1.jar" | |
elasticsearch-hadoop : 2.0.0RC1 | |
- Spark using ESInputFormat works fine. However the type returned from the "date" field ("_ts") is Text. I convert that toString the toLong to get the timestamp, which I can use as I wish within Spark | |
- Shark returns NULL for the timestamp field. There's nothing funny about the timestamps themselves as I can access them in Spark (as Text) and I can do date math on that field in elasticsearch queries. This also returns nulls in EC2, so it's not just on my machine. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
shark> describe extended test; | |
OK | |
ts timestamp from deserializer | |
Detailed Table Information Table(tableName:test, dbName:default, owner:Nick, createTime:1400259117, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:ts, type:timestamp, comment:null)], location:file:/user/hive/warehouse/test, inputFormat:org.elasticsearch.hadoop.hive.EsHiveInputFormat, outputFormat:org.elasticsearch.hadoop.hive.EsHiveOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.elasticsearch.hadoop.hive.EsSerDe, parameters:{serialization.format=1}), bucketCols:[], sortCols:[], parameters:{}, skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[], skewedColValueLocationMaps:{}), storedAsSubDirectories:false), partitionKeys:[], parameters:{EXTERNAL=TRUE, transient_lastDdlTime=1400259117, storage_handler=org.elasticsearch.hadoop.hive.EsStorageHandler, es.mapping.names=ts:_ts, es.resource=my_index/my_type}, viewOriginalText:null, viewExpandedText:null, tableType:EXTERNAL_TABLE) | |
Time taken: 0.054 seconds | |
shark> create external table test (ts timestamp) | |
> STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' | |
> TBLPROPERTIES( | |
> 'es.resource' = 'my_index/my_type', | |
> 'es.mapping.names' = 'ts:_ts'); | |
OK | |
Time taken: 0.223 seconds | |
shark> select * from test limit 5; | |
OK | |
NULL | |
NULL | |
NULL | |
NULL | |
NULL |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object TestElasticsearch extends App { | |
import collection.JavaConversions._ | |
val sc = new SparkContext("local[2]", "test-es") | |
val conf = new Configuration() | |
conf.set("es.resource", "my_index/my_type") | |
conf.set("es.query", "?q=*") | |
val data = sc.newAPIHadoopRDD(conf, | |
classOf[EsInputFormat[NullWritable, LinkedMapWritable]], | |
classOf[NullWritable], | |
classOf[LinkedMapWritable] | |
) | |
val timestamps = data.map { | |
case (_, mapWritable) => | |
val newMap = mapWritable.toMap.map{ case (k, v) => (k.toString, v)} | |
( | |
newMap("_ts").getClass.getName, | |
newMap("_ts").toString.toLong, | |
new DateTime(newMap("_ts").toString.toLong). | |
withChronology(ISOChronology.getInstanceUTC) // all my timestamps are in UTC | |
) | |
} | |
println(timestamps.take(5).mkString("\n")) | |
} | |
/* | |
== Output == | |
14/05/16 18:46:43 INFO spark.SparkContext: Job finished: take at Test.scala:75, took 0.151369 s | |
(org.apache.hadoop.io.Text,1397130476805,2014-04-10T11:47:56.805Z) | |
(org.apache.hadoop.io.Text,1397130476904,2014-04-10T11:47:56.904Z) | |
(org.apache.hadoop.io.Text,1397130476923,2014-04-10T11:47:56.923Z) | |
(org.apache.hadoop.io.Text,1397130476928,2014-04-10T11:47:56.928Z) | |
(org.apache.hadoop.io.Text,1397130476933,2014-04-10T11:47:56.933Z) | |
*/ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment