Skip to content

Instantly share code, notes, and snippets.

@Charmatzis
Created July 24, 2017 09:27
Show Gist options
  • Save Charmatzis/a84cc256eb1fe4fbf03874768eacd2f5 to your computer and use it in GitHub Desktop.
Save Charmatzis/a84cc256eb1fe4fbf03874768eacd2f5 to your computer and use it in GitHub Desktop.
Rasterization
import geotrellis.proj4.{LatLng, WebMercator}
import geotrellis.raster.{ArrayTile, DoubleCellType, IntConstantNoDataCellType, MutableArrayTile, RasterExtent, Tile}
import geotrellis.raster.render.ColorRamps
import geotrellis.raster.resample.Max
import geotrellis.spark._
import geotrellis.spark.io._
import geotrellis.spark.io.index._
import geotrellis.spark.io.s3._
import geotrellis.spark.pyramid._
import geotrellis.spark.tiling._
import geotrellis.vector._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.util.ParseModes
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Dataset, SparkSession, gt}
import scala.collection.mutable.ArrayBuffer
object SaveS3RasterLayer extends App with Implicits {
val _spark = SparkSession
.builder()
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.kryo.registrator", "geotrellis.spark.io.kryo.KryoRegistrator")
//.config("spark.io.compression.codec", "org.apache.spark.io.SnappyCompressionCodec")
//.config("spark.driver.extraJavaOptions", "-XX:+UseG1GC")
//.config("spark.executor.extraJavaOptions", "-XX:+UseG1GC")
//.config("spark.kryoserializer.buffer.max.mb", "1024")
.config("spark.sql.crossJoin.enabled", true)
.config("spark.logConf", "true")
.master("local[*]")
.appName("Raster")
.config("spark.app.input.path", "data/data_pe.csv")
.config("spark.ui.enabled",true)
.config("spark.executor.memory","8g")
.config("spark.driver.memory","9g")
//.config("spark.sql.shuffle.partitions", 2001)
//.config("spark.network.timeout", "600s")
.getOrCreate()
import _spark.implicits._
try {
val conf = _spark.sparkContext.getConf
implicit val sc = _spark.sparkContext
implicit val _sql = _spark.sqlContext
gt.gtRegister(_sql)
val minZoom = conf.getInt("spark.app.minZoom", 4)
val maxZoom = conf.getInt("spark.app.maxZoom", 4)
val output_shipRoutes = conf.get("spark.app.shipRoutes_output", "data/shipRoutes/")
val output = "some-bucket"
val layer_name = conf.get("spark.app.layername", "type7")
//certain routes
val lines = _spark.read.parquet(output_shipRoutes)
.withColumn("geom_reprojected", reproject($"Geom"))
.select($"geom_reprojected")
.as[MultiLine]
.flatMap(x=>x.lines)
.as[Line]
val tileSize = 512
val layout = ZoomedLayoutScheme(WebMercator, tileSize).levelForZoom(maxZoom).layout
def getMetadata(zoom: Int = 10) = {
TileLayerMetadata(DoubleCellType,
layout,
layout.extent,
WebMercator,
KeyBounds(SpatialKey(0, 0),
SpatialKey(layout.layoutCols - 1,
layout.layoutRows - 1)))
}
def lineTile(tile: MutableArrayTile, line: Line, rasterExtent: RasterExtent)
:Tile = {
rasterExtent.foreach(line) { (col, row) =>
val z = 1
tile.set(col, row, z)
}
tile
}
def stampLine(tile: MutableArrayTile,
tup: (SpatialKey, Line)
): MutableArrayTile = {
val (spatialKey, line) = tup
val tileExtent = layout.mapTransform(spatialKey)
val re = RasterExtent(tileExtent, tile)
val result = tile.copy.asInstanceOf[MutableArrayTile]
lineTile(result, line, re)
result
}
def lineToSpatialKey(l: Line): Array[(SpatialKey, Line)] = {
val lineextent = l.envelope
val gridBounds= layout.mapTransform(lineextent)
for {
(c, r) <- gridBounds.coords
if r < layout.tileLayout.totalRows
if c < layout.tileLayout.totalCols
} yield (SpatialKey(c, r), l)
}
def createTiledRddFromLines(lines: RDD[Line], zoomLevel: Int = 10)
: RDD[(SpatialKey, Tile)] = {
lines.flatMap(lineToSpatialKey)
.mapPartitions({ partition =>
partition.map { case (spatialKey, line) =>
(spatialKey, (spatialKey, line))
}
}, preservesPartitioning = true)
.aggregateByKey(ArrayTile.empty(IntConstantNoDataCellType, layout.tileCols, layout.tileRows))(stampLine, sumTiles)
.mapValues { tile: MutableArrayTile => tile.asInstanceOf[Tile] }
}
def sumTiles(t1: MutableArrayTile, t2: MutableArrayTile): MutableArrayTile = {
Adder(t1, t2).asInstanceOf[MutableArrayTile]
}
val metadata = getMetadata(maxZoom)
val contextRdd = new ContextRDD(createTiledRddFromLines(lines.rdd), metadata)
val store = S3AttributeStore(output, layer_name)
val writer = S3LayerWriter(store)
val layoutScheme: ZoomedLayoutScheme = ZoomedLayoutScheme(WebMercator, 512)
Pyramid.upLevels(contextRdd, layoutScheme, maxZoom, minZoom, Max) { (rdd, zoom) =>
println(" Pyramiding operation ")
val layerid: LayerId = LayerId(layer_name, zoom)
if(store.layerExists(layerid)) {
new S3LayerManager(store).delete(layerid)
}
writer.write(layerid, rdd, ZCurveKeyIndexMethod )
}
println("All processes finished.")
}
finally {
_spark.stop()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment