Skip to content

Instantly share code, notes, and snippets.

@MLnick
Created May 16, 2014 16:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save MLnick/bb7c6e87f5c53be2cce4 to your computer and use it in GitHub Desktop.
Save MLnick/bb7c6e87f5c53be2cce4 to your computer and use it in GitHub Desktop.
Elasticsearch / Spark / Shark setup
Elasticsearch : "1.1.1"
Spark : "0.9.1-hadoop1"
Shark : "0.9.1-hadoop1"
elasticsearch-hadoop-hive : "elasticsearch-hadoop-hive-2.0.0.RC1.jar"
elasticsearch-hadoop : 2.0.0RC1
- Spark using ESInputFormat works fine. However the type returned from the "date" field ("_ts") is Text. I convert that toString the toLong to get the timestamp, which I can use as I wish within Spark
- Shark returns NULL for the timestamp field. There's nothing funny about the timestamps themselves as I can access them in Spark (as Text) and I can do date math on that field in elasticsearch queries. This also returns nulls in EC2, so it's not just on my machine.
shark> describe extended test;
OK
ts timestamp from deserializer
Detailed Table Information Table(tableName:test, dbName:default, owner:Nick, createTime:1400259117, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:ts, type:timestamp, comment:null)], location:file:/user/hive/warehouse/test, inputFormat:org.elasticsearch.hadoop.hive.EsHiveInputFormat, outputFormat:org.elasticsearch.hadoop.hive.EsHiveOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.elasticsearch.hadoop.hive.EsSerDe, parameters:{serialization.format=1}), bucketCols:[], sortCols:[], parameters:{}, skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[], skewedColValueLocationMaps:{}), storedAsSubDirectories:false), partitionKeys:[], parameters:{EXTERNAL=TRUE, transient_lastDdlTime=1400259117, storage_handler=org.elasticsearch.hadoop.hive.EsStorageHandler, es.mapping.names=ts:_ts, es.resource=my_index/my_type}, viewOriginalText:null, viewExpandedText:null, tableType:EXTERNAL_TABLE)
Time taken: 0.054 seconds
shark> create external table test (ts timestamp)
> STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
> TBLPROPERTIES(
> 'es.resource' = 'my_index/my_type',
> 'es.mapping.names' = 'ts:_ts');
OK
Time taken: 0.223 seconds
shark> select * from test limit 5;
OK
NULL
NULL
NULL
NULL
NULL
object TestElasticsearch extends App {
import collection.JavaConversions._
val sc = new SparkContext("local[2]", "test-es")
val conf = new Configuration()
conf.set("es.resource", "my_index/my_type")
conf.set("es.query", "?q=*")
val data = sc.newAPIHadoopRDD(conf,
classOf[EsInputFormat[NullWritable, LinkedMapWritable]],
classOf[NullWritable],
classOf[LinkedMapWritable]
)
val timestamps = data.map {
case (_, mapWritable) =>
val newMap = mapWritable.toMap.map{ case (k, v) => (k.toString, v)}
(
newMap("_ts").getClass.getName,
newMap("_ts").toString.toLong,
new DateTime(newMap("_ts").toString.toLong).
withChronology(ISOChronology.getInstanceUTC) // all my timestamps are in UTC
)
}
println(timestamps.take(5).mkString("\n"))
}
/*
== Output ==
14/05/16 18:46:43 INFO spark.SparkContext: Job finished: take at Test.scala:75, took 0.151369 s
(org.apache.hadoop.io.Text,1397130476805,2014-04-10T11:47:56.805Z)
(org.apache.hadoop.io.Text,1397130476904,2014-04-10T11:47:56.904Z)
(org.apache.hadoop.io.Text,1397130476923,2014-04-10T11:47:56.923Z)
(org.apache.hadoop.io.Text,1397130476928,2014-04-10T11:47:56.928Z)
(org.apache.hadoop.io.Text,1397130476933,2014-04-10T11:47:56.933Z)
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment