Created
July 24, 2017 09:27
-
-
Save Charmatzis/a84cc256eb1fe4fbf03874768eacd2f5 to your computer and use it in GitHub Desktop.
Rasterization
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import geotrellis.proj4.{LatLng, WebMercator} | |
import geotrellis.raster.{ArrayTile, DoubleCellType, IntConstantNoDataCellType, MutableArrayTile, RasterExtent, Tile} | |
import geotrellis.raster.render.ColorRamps | |
import geotrellis.raster.resample.Max | |
import geotrellis.spark._ | |
import geotrellis.spark.io._ | |
import geotrellis.spark.io.index._ | |
import geotrellis.spark.io.s3._ | |
import geotrellis.spark.pyramid._ | |
import geotrellis.spark.tiling._ | |
import geotrellis.vector._ | |
import org.apache.spark.rdd.RDD | |
import org.apache.spark.sql.catalyst.ScalaReflection | |
import org.apache.spark.sql.catalyst.util.ParseModes | |
import org.apache.spark.sql.functions.udf | |
import org.apache.spark.sql.types.StructType | |
import org.apache.spark.sql.{Dataset, SparkSession, gt} | |
import scala.collection.mutable.ArrayBuffer | |
object SaveS3RasterLayer extends App with Implicits { | |
val _spark = SparkSession | |
.builder() | |
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") | |
.config("spark.kryo.registrator", "geotrellis.spark.io.kryo.KryoRegistrator") | |
//.config("spark.io.compression.codec", "org.apache.spark.io.SnappyCompressionCodec") | |
//.config("spark.driver.extraJavaOptions", "-XX:+UseG1GC") | |
//.config("spark.executor.extraJavaOptions", "-XX:+UseG1GC") | |
//.config("spark.kryoserializer.buffer.max.mb", "1024") | |
.config("spark.sql.crossJoin.enabled", true) | |
.config("spark.logConf", "true") | |
.master("local[*]") | |
.appName("Raster") | |
.config("spark.app.input.path", "data/data_pe.csv") | |
.config("spark.ui.enabled",true) | |
.config("spark.executor.memory","8g") | |
.config("spark.driver.memory","9g") | |
//.config("spark.sql.shuffle.partitions", 2001) | |
//.config("spark.network.timeout", "600s") | |
.getOrCreate() | |
import _spark.implicits._ | |
try { | |
val conf = _spark.sparkContext.getConf | |
implicit val sc = _spark.sparkContext | |
implicit val _sql = _spark.sqlContext | |
gt.gtRegister(_sql) | |
val minZoom = conf.getInt("spark.app.minZoom", 4) | |
val maxZoom = conf.getInt("spark.app.maxZoom", 4) | |
val output_shipRoutes = conf.get("spark.app.shipRoutes_output", "data/shipRoutes/") | |
val output = "some-bucket" | |
val layer_name = conf.get("spark.app.layername", "type7") | |
//certain routes | |
val lines = _spark.read.parquet(output_shipRoutes) | |
.withColumn("geom_reprojected", reproject($"Geom")) | |
.select($"geom_reprojected") | |
.as[MultiLine] | |
.flatMap(x=>x.lines) | |
.as[Line] | |
val tileSize = 512 | |
val layout = ZoomedLayoutScheme(WebMercator, tileSize).levelForZoom(maxZoom).layout | |
def getMetadata(zoom: Int = 10) = { | |
TileLayerMetadata(DoubleCellType, | |
layout, | |
layout.extent, | |
WebMercator, | |
KeyBounds(SpatialKey(0, 0), | |
SpatialKey(layout.layoutCols - 1, | |
layout.layoutRows - 1))) | |
} | |
def lineTile(tile: MutableArrayTile, line: Line, rasterExtent: RasterExtent) | |
:Tile = { | |
rasterExtent.foreach(line) { (col, row) => | |
val z = 1 | |
tile.set(col, row, z) | |
} | |
tile | |
} | |
def stampLine(tile: MutableArrayTile, | |
tup: (SpatialKey, Line) | |
): MutableArrayTile = { | |
val (spatialKey, line) = tup | |
val tileExtent = layout.mapTransform(spatialKey) | |
val re = RasterExtent(tileExtent, tile) | |
val result = tile.copy.asInstanceOf[MutableArrayTile] | |
lineTile(result, line, re) | |
result | |
} | |
def lineToSpatialKey(l: Line): Array[(SpatialKey, Line)] = { | |
val lineextent = l.envelope | |
val gridBounds= layout.mapTransform(lineextent) | |
for { | |
(c, r) <- gridBounds.coords | |
if r < layout.tileLayout.totalRows | |
if c < layout.tileLayout.totalCols | |
} yield (SpatialKey(c, r), l) | |
} | |
def createTiledRddFromLines(lines: RDD[Line], zoomLevel: Int = 10) | |
: RDD[(SpatialKey, Tile)] = { | |
lines.flatMap(lineToSpatialKey) | |
.mapPartitions({ partition => | |
partition.map { case (spatialKey, line) => | |
(spatialKey, (spatialKey, line)) | |
} | |
}, preservesPartitioning = true) | |
.aggregateByKey(ArrayTile.empty(IntConstantNoDataCellType, layout.tileCols, layout.tileRows))(stampLine, sumTiles) | |
.mapValues { tile: MutableArrayTile => tile.asInstanceOf[Tile] } | |
} | |
def sumTiles(t1: MutableArrayTile, t2: MutableArrayTile): MutableArrayTile = { | |
Adder(t1, t2).asInstanceOf[MutableArrayTile] | |
} | |
val metadata = getMetadata(maxZoom) | |
val contextRdd = new ContextRDD(createTiledRddFromLines(lines.rdd), metadata) | |
val store = S3AttributeStore(output, layer_name) | |
val writer = S3LayerWriter(store) | |
val layoutScheme: ZoomedLayoutScheme = ZoomedLayoutScheme(WebMercator, 512) | |
Pyramid.upLevels(contextRdd, layoutScheme, maxZoom, minZoom, Max) { (rdd, zoom) => | |
println(" Pyramiding operation ") | |
val layerid: LayerId = LayerId(layer_name, zoom) | |
if(store.layerExists(layerid)) { | |
new S3LayerManager(store).delete(layerid) | |
} | |
writer.write(layerid, rdd, ZCurveKeyIndexMethod ) | |
} | |
println("All processes finished.") | |
} | |
finally { | |
_spark.stop() | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment