Skip to content

Instantly share code, notes, and snippets.

@lossyrob
Created February 8, 2018 23:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lossyrob/4b95306204e8a660d1807ba1883fae9b to your computer and use it in GitHub Desktop.
Save lossyrob/4b95306204e8a660d1807ba1883fae9b to your computer and use it in GitHub Desktop.
ExportSnapshot
package osmesa.analytics.oneoffs
import osmesa.analytics._
import cats.implicits._
import com.monovore.decline._
import geotrellis.vector._
import geotrellis.vector.io._
import geotrellis.vector.io.json._
import geotrellis.spark.util.KryoWrapper
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import spray.json._
import java.sql.Timestamp
import java.time._
import java.time.format._
import scala.util._
object ExportSnapshotCommand extends CommandApp(
name = "export-snapshot",
header = "Exports a snapshot of OSM based on history and changeset ORCs, a bounding box, and a point in time.",
main = {
val historyO = Opts.option[String]("history", help = "URI to the history ORC file to process.")
val boundaryO = Opts.option[String]("boundary", help = "URI to geojson of boundary")
val timeO = Opts.option[String]("time", help = "ISO 8601 format of snapshot date.")
val snapshotPathO = Opts.option[String]("snapshotPath", help = "URI for output snapshot ORC")
(
historyO,
boundaryO,
timeO,
snapshotPathO
).mapN { (historyUri, boundaryUri, timeStr, snapshotUri) =>
Try(LocalDateTime.parse(timeStr)) match {
case Success(time) =>
ExportSnapshot.run(historyUri, boundaryUri, time, snapshotUri)
case Failure(e) =>
e match {
case _: DateTimeParseException =>
println(s"Could not parse date string '${timeStr}'. Make sure it's in ISO 8601 format. Parse error: $e")
sys.exit(1)
case _ =>
throw e
}
}
}
}
)
object ExportSnapshot {
def readFile(path: String): String = {
val src = scala.io.Source.fromFile(path)
try {
src.mkString
} finally {
src.close
}
}
def run(
historyUri: String,
boundaryUri: String,
time: LocalDateTime,
snapshotUri: String
): Unit = {
val mp = {
// Allow the GeoJSON to be a Polygon, MultiPolygon, or FeatureCollection with Polygons or MultiPolygons
val polygons =
Try(readFile(boundaryUri).parseGeoJson[Polygon]).map(List(_)).getOrElse(List[Polygon]()) :::
Try(readFile(boundaryUri).parseGeoJson[MultiPolygon]).map(_.polygons.toList).getOrElse(List[Polygon]()) :::
Try(readFile(boundaryUri).parseGeoJson[JsonFeatureCollection].getAll[MultiPolygon]).map { collection =>
collection.getAll[Polygon].toList :::
collection.getAll[MultiPolygon].flatMap(_.polygons).toList
}.getOrElse(List[Polygon]())
MultiPolygon(polygons)
}
implicit val spark = Analytics.sparkSession("StatsJob")
try {
val history = spark.read.orc(historyUri)
val df = createSnapshotDataFrame(history, mp, time)
df.write.format("orc").save(snapshotUri)
} finally {
spark.stop()
}
}
def createSnapshotDataFrame(
history: DataFrame,
boundary: MultiPolygon,
time: LocalDateTime
)(implicit spark: SparkSession): DataFrame = {
import spark.implicits._
val preparedGeom = KryoWrapper(boundary.prepare)
val boundingBox = boundary.envelope
val isInBoundary =
udf { (lat: Double, lon: Double) =>
if(boundingBox.contains(lon, lat)) {
preparedGeom.value.contains(Point(lon, lat))
} else { false }
}
val ts = Timestamp.valueOf(time)
val timeFiltered =
history.
where($"timestamp" <= ts)
val nodeIds =
timeFiltered.
where($"type" === "node").
where(isInBoundary($"lat", $"lon")).
select($"id")
val wayIds =
timeFiltered.
where($"type" === "way").
select($"id", explode($"nds").as("nodeId")).
join(nodeIds.select($"id".as("nodeId")), "nodeId").
drop($"nodeId")
// We want to create a DF that has the relation ID,
// the type of related element that is a way or node,
// and the target ID.
// OSM can have relations of relations, so we need to flatten any
// relations out in a loop.
val relationsToTargets =
timeFiltered.
where($"type" === "relation").
select($"id", explode($"members").as("member")).
select($"id", $"member.type".as("targetType"), $"member.id".as("targetId"))
var flattenedRelationTargets =
relationsToTargets.
where($"targetType" =!= "relation")
var relationsOfRelations =
relationsToTargets.
where($"targetType" === "relation").
select($"id", $"targetId".as("relationId"))
while(relationsOfRelations.count() > 0) {
val df =
relationsToTargets.select($"id".as("joinId"), $"targetId", $"targetType")
val flattened =
relationsOfRelations.
join(relationsToTargets, relationsOfRelations.col("relation") === df.col("joinId")).
drop("relationid")
flattenedRelationTargets =
flattened.where($"targetType" =!= "relation").union(flattenedRelationTargets)
relationsOfRelations =
flattened.
where($"targetType" === "relation").
select($"id", $"targetId".as("relationId"))
}
val nodeRelationIds =
flattenedRelationTargets.
where($"targetType" === "node").
join(nodeIds.select($"id").as("targetId"), "targetId").
select($"id")
val wayRelationIds =
flattenedRelationTargets.
where($"targetType" === "way").
join(nodeIds.select($"id").as("targetId"), "targetId").
select($"id")
val desiredIds =
nodeIds.
union(wayIds).
union(nodeRelationIds).
union(wayRelationIds).
select($"id".as("joinId")).
distinct
val filtered =
timeFiltered.
join(desiredIds, timeFiltered.col("id") === desiredIds.col("joinId")).
drop("joinId")
// Now need to set the visiblilty correctly
// We need to refer back to the old history to make sure we don't
// include anything that was deleted permenantly, not because of a new version.
???
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment