Skip to content

Instantly share code, notes, and snippets.

@vpipkt
Created May 25, 2016 18:39
Show Gist options
  • Save vpipkt/aa0a6a128cb79ba434d8b466f2ac49ec to your computer and use it in GitHub Desktop.
Save vpipkt/aa0a6a128cb79ba434d8b466f2ac49ec to your computer and use it in GitHub Desktop.
package mil.nga.giat.geowave.analytics.spark.tools
import com.vividsolutions.jts.geom.LineString
import mil.nga.giat.geowave.adapter.vector.FeatureWritable
import mil.nga.giat.geowave.analytic.distance.DistanceFn
import mil.nga.giat.geowave.analytic.partitioner.Partitioner.PartitionData
import mil.nga.giat.geowave.analytics.spark._
import mil.nga.giat.geowave.core.store.operations.remote.options.DataStorePluginOptions
import mil.nga.giat.geowave.mapreduce.input.{GeoWaveInputFormat, GeoWaveInputKey}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.opengis.feature.simple.SimpleFeature
import scala.collection.mutable.PriorityQueue
import scala.collection.mutable.HashMap
import collection.JavaConverters._
object AnalyticRecipes extends Serializable {
val gwParam: java.util.Map[String, String] = HashMap(
"user" -> "root",
"password" -> "secret",
"zookeeper" -> "localhost:2181",
"instance" -> "local",
"gwNamespace" -> "geowave091.tracks"
).asJava
/*val gwParam = Map()*/
/*
Build it:
export BUILD_ARGS="-Daccumulo.version=1.6.2 -Dhadoop.version=2.6.0 -Dgeotools.version=14.2 -Dgeoserver.version=2.8.2 -Daccumulo.api=1.6 -Dfindbugs.skip=true -Dformatter.skip=true -DskipITs=true -DskipTests=true -T 12"
mvn clean install -rf :geowave-analytic-spark $BUILD_ARGS && \
mvn package -P geowave-tools-singlejar $BUILD_ARGS
Run it:
spark-submit --master yarn-client --class mil.nga.giat.geowave.analytics.spark.tools.AnalyticRecipes --jars /usr/local/geowave/tools/geowave-tools.jar geowave-analytic-spark-0.9.1-tools.jar
*/
def main(args: Array[String]) {
// Configure Spark
val sc = new SparkContext(GeoWaveRDD.init(new SparkConf(true)))
// need some pieces to make geowave write
val dspo = new DataStorePluginOptions("accumulo", gwParam)
val gwc = GeoWaveContext(dspo, "accumulo", gwParam.get("gwNameSpace"))
val conf = new org.apache.hadoop.conf.Configuration(sc.hadoopConfiguration)
GeoWaveInputFormat.setDataStoreName(conf, gwc.dataStoreName)
GeoWaveInputFormat.setStoreConfigOptions(conf, gwc.storeParameters)
val sftRdd:RDD[(GeoWaveInputKey, SimpleFeature)] = sc.newAPIHadoopRDD(conf, classOf[GeoWaveInputFormat[SimpleFeature]], classOf[GeoWaveInputKey], classOf[SimpleFeature])
/*
val feats : RDD[(String, Double, Int)] = sftRdd.mapPartitions{ iter:Iterator[(GeoWaveInputKey,SimpleFeature)] =>
iter.map{ v: (GeoWaveInputKey, SimpleFeature) => {
// id, distance, numberTrackPoints
(v._2.getID ,
v._2.getAttribute("totalDistance").asInstanceOf[Double],
v._2.getAttribute("numTrackPoints").asInstanceOf[Int]
)
}
}
}*/
// Get an RDD of featureId's and simple features, with dupes?
val feats : RDD[(String, SimpleFeature)] = sftRdd.map { v:(GeoWaveInputKey, SimpleFeature) => {
val hashit = v._2.getAttribute("geometry").toString +
v._2.getAttribute("TimeStamp").toString +
v._2.getAttribute("key").toString +
v._2.getAttribute("totalDistance").toString +
v._2.getAttribute("numTrackPoints").toString
(hashit.hashCode.toString, v._2)
}
}
feats.cache()
/* //this works fine
val ct = feats.map{kv: (String, SimpleFeature) =>
(kv._1, 1)
}.reduceByKey(_ + _).count() */
val totalDist: RDD[(String,Double)] = feats.map{kv:(String, SimpleFeature) =>
(kv._1, kv._2.getAttribute("totalDistance").asInstanceOf[Double])
//val g = kv._2.getDefaultGeometry.asInstanceOf[LineString] // NOPE this is in degrees, have to project it
//(kv._1, g.getLength)
}.reduceByKey { (a:Double, b:Double) =>
if(a > b) a else b
}
println("Total distance: " +
totalDist.map(x => x._2).reduce(_+_) +
" km.")
val totalPoints = feats.map{kv:(String, SimpleFeature) =>
val g = kv._2.getDefaultGeometry.asInstanceOf[LineString]
(kv._1, g.getNumPoints)
}.reduceByKey { (a:Int, b:Int) =>
if(a > b) a else b
}
println("Total points: " +
totalPoints.map(x => x._2).reduce(_+_) +
".")
//println("\nFind " + ct + " simple features by feature ID.")
/*var featSummary = feats.reduce{ (f1, f2) =>
id =
("foo",1.0,1)
}*/
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment