Created
February 8, 2018 23:09
-
-
Save lossyrob/4b95306204e8a660d1807ba1883fae9b to your computer and use it in GitHub Desktop.
ExportSnapshot
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package osmesa.analytics.oneoffs | |
import osmesa.analytics._ | |
import cats.implicits._ | |
import com.monovore.decline._ | |
import geotrellis.vector._ | |
import geotrellis.vector.io._ | |
import geotrellis.vector.io.json._ | |
import geotrellis.spark.util.KryoWrapper | |
import org.apache.spark.sql._ | |
import org.apache.spark.sql.functions._ | |
import spray.json._ | |
import java.sql.Timestamp | |
import java.time._ | |
import java.time.format._ | |
import scala.util._ | |
object ExportSnapshotCommand extends CommandApp( | |
name = "export-snapshot", | |
header = "Exports a snapshot of OSM based on history and changeset ORCs, a bounding box, and a point in time.", | |
main = { | |
val historyO = Opts.option[String]("history", help = "URI to the history ORC file to process.") | |
val boundaryO = Opts.option[String]("boundary", help = "URI to geojson of boundary") | |
val timeO = Opts.option[String]("time", help = "ISO 8601 format of snapshot date.") | |
val snapshotPathO = Opts.option[String]("snapshotPath", help = "URI for output snapshot ORC") | |
( | |
historyO, | |
boundaryO, | |
timeO, | |
snapshotPathO | |
).mapN { (historyUri, boundaryUri, timeStr, snapshotUri) => | |
Try(LocalDateTime.parse(timeStr)) match { | |
case Success(time) => | |
ExportSnapshot.run(historyUri, boundaryUri, time, snapshotUri) | |
case Failure(e) => | |
e match { | |
case _: DateTimeParseException => | |
println(s"Could not parse date string '${timeStr}'. Make sure it's in ISO 8601 format. Parse error: $e") | |
sys.exit(1) | |
case _ => | |
throw e | |
} | |
} | |
} | |
} | |
) | |
object ExportSnapshot { | |
def readFile(path: String): String = { | |
val src = scala.io.Source.fromFile(path) | |
try { | |
src.mkString | |
} finally { | |
src.close | |
} | |
} | |
def run( | |
historyUri: String, | |
boundaryUri: String, | |
time: LocalDateTime, | |
snapshotUri: String | |
): Unit = { | |
val mp = { | |
// Allow the GeoJSON to be a Polygon, MultiPolygon, or FeatureCollection with Polygons or MultiPolygons | |
val polygons = | |
Try(readFile(boundaryUri).parseGeoJson[Polygon]).map(List(_)).getOrElse(List[Polygon]()) ::: | |
Try(readFile(boundaryUri).parseGeoJson[MultiPolygon]).map(_.polygons.toList).getOrElse(List[Polygon]()) ::: | |
Try(readFile(boundaryUri).parseGeoJson[JsonFeatureCollection].getAll[MultiPolygon]).map { collection => | |
collection.getAll[Polygon].toList ::: | |
collection.getAll[MultiPolygon].flatMap(_.polygons).toList | |
}.getOrElse(List[Polygon]()) | |
MultiPolygon(polygons) | |
} | |
implicit val spark = Analytics.sparkSession("StatsJob") | |
try { | |
val history = spark.read.orc(historyUri) | |
val df = createSnapshotDataFrame(history, mp, time) | |
df.write.format("orc").save(snapshotUri) | |
} finally { | |
spark.stop() | |
} | |
} | |
def createSnapshotDataFrame( | |
history: DataFrame, | |
boundary: MultiPolygon, | |
time: LocalDateTime | |
)(implicit spark: SparkSession): DataFrame = { | |
import spark.implicits._ | |
val preparedGeom = KryoWrapper(boundary.prepare) | |
val boundingBox = boundary.envelope | |
val isInBoundary = | |
udf { (lat: Double, lon: Double) => | |
if(boundingBox.contains(lon, lat)) { | |
preparedGeom.value.contains(Point(lon, lat)) | |
} else { false } | |
} | |
val ts = Timestamp.valueOf(time) | |
val timeFiltered = | |
history. | |
where($"timestamp" <= ts) | |
val nodeIds = | |
timeFiltered. | |
where($"type" === "node"). | |
where(isInBoundary($"lat", $"lon")). | |
select($"id") | |
val wayIds = | |
timeFiltered. | |
where($"type" === "way"). | |
select($"id", explode($"nds").as("nodeId")). | |
join(nodeIds.select($"id".as("nodeId")), "nodeId"). | |
drop($"nodeId") | |
// We want to create a DF that has the relation ID, | |
// the type of related element that is a way or node, | |
// and the target ID. | |
// OSM can have relations of relations, so we need to flatten any | |
// relations out in a loop. | |
val relationsToTargets = | |
timeFiltered. | |
where($"type" === "relation"). | |
select($"id", explode($"members").as("member")). | |
select($"id", $"member.type".as("targetType"), $"member.id".as("targetId")) | |
var flattenedRelationTargets = | |
relationsToTargets. | |
where($"targetType" =!= "relation") | |
var relationsOfRelations = | |
relationsToTargets. | |
where($"targetType" === "relation"). | |
select($"id", $"targetId".as("relationId")) | |
while(relationsOfRelations.count() > 0) { | |
val df = | |
relationsToTargets.select($"id".as("joinId"), $"targetId", $"targetType") | |
val flattened = | |
relationsOfRelations. | |
join(relationsToTargets, relationsOfRelations.col("relation") === df.col("joinId")). | |
drop("relationid") | |
flattenedRelationTargets = | |
flattened.where($"targetType" =!= "relation").union(flattenedRelationTargets) | |
relationsOfRelations = | |
flattened. | |
where($"targetType" === "relation"). | |
select($"id", $"targetId".as("relationId")) | |
} | |
val nodeRelationIds = | |
flattenedRelationTargets. | |
where($"targetType" === "node"). | |
join(nodeIds.select($"id").as("targetId"), "targetId"). | |
select($"id") | |
val wayRelationIds = | |
flattenedRelationTargets. | |
where($"targetType" === "way"). | |
join(nodeIds.select($"id").as("targetId"), "targetId"). | |
select($"id") | |
val desiredIds = | |
nodeIds. | |
union(wayIds). | |
union(nodeRelationIds). | |
union(wayRelationIds). | |
select($"id".as("joinId")). | |
distinct | |
val filtered = | |
timeFiltered. | |
join(desiredIds, timeFiltered.col("id") === desiredIds.col("joinId")). | |
drop("joinId") | |
// Now need to set the visiblilty correctly | |
// We need to refer back to the old history to make sure we don't | |
// include anything that was deleted permenantly, not because of a new version. | |
??? | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment