Created
May 25, 2016 18:39
-
-
Save vpipkt/aa0a6a128cb79ba434d8b466f2ac49ec to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package mil.nga.giat.geowave.analytics.spark.tools | |
import com.vividsolutions.jts.geom.LineString | |
import mil.nga.giat.geowave.adapter.vector.FeatureWritable | |
import mil.nga.giat.geowave.analytic.distance.DistanceFn | |
import mil.nga.giat.geowave.analytic.partitioner.Partitioner.PartitionData | |
import mil.nga.giat.geowave.analytics.spark._ | |
import mil.nga.giat.geowave.core.store.operations.remote.options.DataStorePluginOptions | |
import mil.nga.giat.geowave.mapreduce.input.{GeoWaveInputFormat, GeoWaveInputKey} | |
import org.apache.spark.rdd.RDD | |
import org.apache.spark.{SparkConf, SparkContext} | |
import org.opengis.feature.simple.SimpleFeature | |
import scala.collection.mutable.PriorityQueue | |
import scala.collection.mutable.HashMap | |
import collection.JavaConverters._ | |
object AnalyticRecipes extends Serializable { | |
val gwParam: java.util.Map[String, String] = HashMap( | |
"user" -> "root", | |
"password" -> "secret", | |
"zookeeper" -> "localhost:2181", | |
"instance" -> "local", | |
"gwNamespace" -> "geowave091.tracks" | |
).asJava | |
/*val gwParam = Map()*/ | |
/* | |
Build it: | |
export BUILD_ARGS="-Daccumulo.version=1.6.2 -Dhadoop.version=2.6.0 -Dgeotools.version=14.2 -Dgeoserver.version=2.8.2 -Daccumulo.api=1.6 -Dfindbugs.skip=true -Dformatter.skip=true -DskipITs=true -DskipTests=true -T 12" | |
mvn clean install -rf :geowave-analytic-spark $BUILD_ARGS && \ | |
mvn package -P geowave-tools-singlejar $BUILD_ARGS | |
Run it: | |
spark-submit --master yarn-client --class mil.nga.giat.geowave.analytics.spark.tools.AnalyticRecipes --jars /usr/local/geowave/tools/geowave-tools.jar geowave-analytic-spark-0.9.1-tools.jar | |
*/ | |
def main(args: Array[String]) { | |
// Configure Spark | |
val sc = new SparkContext(GeoWaveRDD.init(new SparkConf(true))) | |
// need some pieces to make geowave write | |
val dspo = new DataStorePluginOptions("accumulo", gwParam) | |
val gwc = GeoWaveContext(dspo, "accumulo", gwParam.get("gwNameSpace")) | |
val conf = new org.apache.hadoop.conf.Configuration(sc.hadoopConfiguration) | |
GeoWaveInputFormat.setDataStoreName(conf, gwc.dataStoreName) | |
GeoWaveInputFormat.setStoreConfigOptions(conf, gwc.storeParameters) | |
val sftRdd:RDD[(GeoWaveInputKey, SimpleFeature)] = sc.newAPIHadoopRDD(conf, classOf[GeoWaveInputFormat[SimpleFeature]], classOf[GeoWaveInputKey], classOf[SimpleFeature]) | |
/* | |
val feats : RDD[(String, Double, Int)] = sftRdd.mapPartitions{ iter:Iterator[(GeoWaveInputKey,SimpleFeature)] => | |
iter.map{ v: (GeoWaveInputKey, SimpleFeature) => { | |
// id, distance, numberTrackPoints | |
(v._2.getID , | |
v._2.getAttribute("totalDistance").asInstanceOf[Double], | |
v._2.getAttribute("numTrackPoints").asInstanceOf[Int] | |
) | |
} | |
} | |
}*/ | |
// Get an RDD of featureId's and simple features, with dupes? | |
val feats : RDD[(String, SimpleFeature)] = sftRdd.map { v:(GeoWaveInputKey, SimpleFeature) => { | |
val hashit = v._2.getAttribute("geometry").toString + | |
v._2.getAttribute("TimeStamp").toString + | |
v._2.getAttribute("key").toString + | |
v._2.getAttribute("totalDistance").toString + | |
v._2.getAttribute("numTrackPoints").toString | |
(hashit.hashCode.toString, v._2) | |
} | |
} | |
feats.cache() | |
/* //this works fine | |
val ct = feats.map{kv: (String, SimpleFeature) => | |
(kv._1, 1) | |
}.reduceByKey(_ + _).count() */ | |
val totalDist: RDD[(String,Double)] = feats.map{kv:(String, SimpleFeature) => | |
(kv._1, kv._2.getAttribute("totalDistance").asInstanceOf[Double]) | |
//val g = kv._2.getDefaultGeometry.asInstanceOf[LineString] // NOPE this is in degrees, have to project it | |
//(kv._1, g.getLength) | |
}.reduceByKey { (a:Double, b:Double) => | |
if(a > b) a else b | |
} | |
println("Total distance: " + | |
totalDist.map(x => x._2).reduce(_+_) + | |
" km.") | |
val totalPoints = feats.map{kv:(String, SimpleFeature) => | |
val g = kv._2.getDefaultGeometry.asInstanceOf[LineString] | |
(kv._1, g.getNumPoints) | |
}.reduceByKey { (a:Int, b:Int) => | |
if(a > b) a else b | |
} | |
println("Total points: " + | |
totalPoints.map(x => x._2).reduce(_+_) + | |
".") | |
//println("\nFind " + ct + " simple features by feature ID.") | |
/*var featSummary = feats.reduce{ (f1, f2) => | |
id = | |
("foo",1.0,1) | |
}*/ | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment