Skip to content

Instantly share code, notes, and snippets.

@echeipesh
Last active May 26, 2016 21:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save echeipesh/ed7eea567f6d001813ed5d1d22e6949b to your computer and use it in GitHub Desktop.
Save echeipesh/ed7eea567f6d001813ed5d1d22e6949b to your computer and use it in GitHub Desktop.
GeoTrellis Multiband Temporal Data

GeoTrellis Temporal MultiBand

MultibandTile is a stack of tiles, intended as mostly for multi-spectral imagery. SpaceTimeKey has an time instant as a member, so it is a point in time. In this manner what is most readily represented is a multiband stack, rather than a cube (since we say nothing about temporal resolution).

However, nothing would stop you from placing your own meaning on the values of band index and treat each index i as an increment of d*i time units from the instant of the key.

If you find that kind of approach kloodgy it would be possible to create a new value type MultibandCube or TimeCube, something that wraps Seq[MultibandTile] and makes it behave as a cube. In order to read/write such RDDs you would only need to provide an AvroRecordCodec type class instance, which could be built from types already implemented with relative ease.

This is a use case we envisioned but have not flushed out yet. If you end-up going down this road I think this would be an excellent contribution that could be rolled into GeoTrellis master as you work on it.

Overall we make an attempt to express as much of the algorithms as possible through type classes such that new value and key types could be used in more aspects incrementally by implementing relevant type classes. For instance for MultibandCube to be mergable it would need to implement (TileMergeMethods): https://github.com/geotrellis/geotrellis/blob/2402607b0c6f5a4e087bc46fe32c5f56e206947e/raster/src/main/scala/geotrellis/raster/merge/TileMergeMethods.scala Which again in this case could be expressed in existing functionality.

What is MultiBand image

https://github.com/geotrellis/geotrellis/blob/master/raster/src/main/scala/geotrellis/raster/MultibandTile.scala

MultibandTile is an indexed stack of Tiles (each representing a band) which all share the same pixel count, cell type, and by implication spatial extent when they're wrapped in Raster[MultibandTile]. There is nothing that imposes meaning on the band index, so they can be anything. So far we've used them in predictable way of reading a multi-spectral images.

Sourcing MultiBand images

We can read multiband GeoTiffs like so:

import geotrellis.spark.io.hadoop._

val sc: SparkContext = ???
val source: RDD[(ProjectedExtent, MultibandTile)] = sc.hadoopMultibandGeoTiffRDD("hdfs://namenode:8020/dir/")

That uses MultibandGeoTiffInputFormat under the covers and doesn't really require any configuration.

Reading temporal GeoTiffs is a little tricker. There doesn't seem to be a standard for how the timestamp tiff tag. There is sc.hadoopTemporalMultibandGeoTiffRDD method that assumes the tag is GEOTIFF_TIME_TAG but thats essentially a default we used in the past. This is an area where more sugar should be added for nicer API. But here is the verbose way to use it:

import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration

val sc: SparkContext = ???

val path: Path = new Path("hdfs://namenode:8020/dirs")
val conf: Configuration = sc.hadoopConfiguration.withInputDirectory(path, List("*.tiff"))

TemporalGeoTiffInputFormat.setTimeTag(conf, "SCENE_TIME")
TemporalGeoTiffInputFormat.setTimeFormat(conf, "YYYY:MM:dd HH:mm:ss")

val source: RDD[(TemporalProjectedExtent, MultibandTile)] =
    sc.newAPIHadoopRDD(conf,
        classOf[TemporalMultibandGeoTiffInputFormat],
        classOf[TemporalProjectedExtent],
        classOf[MultibandTile])

Tiling Multiband images

Having read some unstructured GeoTiffs isn't much use since we have no way to join one set to another or save them in meaningful way. So the thing to do is to tile them to a specific layout. We can do it like so:

val tileLayerMetadata: TileLayerMetadata = source.collectMetadata[SpaceTimeKey](FloatingLayoutScheme(512))
val tiledRdd: RDD[(SpaceTimeKey, MultibandTile)] = source.tileToLayout[SpaceTimeKey](tileLayerMetadata, Bilinear)

There are a number of overloads to collectMetadata that allow you to specify some of the things you know: https://github.com/geotrellis/geotrellis/blob/master/spark/src/main/scala/geotrellis/spark/package.scala#L127-L147

collectMetadata will actually fetch the data and reduce through it to find some things out. If you know a fair bit bout the structure of your input there is a decent chance you will be able to construct TileLayerMetadata directly and avoid this costly step.

Two choices for layout scheme are FloatingLayoutScheme which can be at any, floating, resolution and ZoomedLayoutScheme which represents a TMS pyramid. A TMS pryramid a resolution for each tier implied by the CRS and tile size used.

Ingest Process

The process of tiling / retrojecting / pryamiding / saving gets called an ingest process. I think its important to be able to break up the process into these steps so the data can be handled in between. However if you find yourself in a standard case you can checkout this helper class: https://github.com/geotrellis/geotrellis/blob/master/spark/src/main/scala/geotrellis/spark/ingest/MultibandIngest.scala

SpaceTimeKey

case class SpaceTimeKey(col: Int, row: Int, instant: Long)

col and row are from the LayoutDefinition specified in the metadata, a non-overlapping raster grid. Instant is the instant of time in milliseconds. Unlike space time is not effectively gridded. It will be gridded when writing, but in memory a SpaceTimeKey is essentially a point in time.

Updating MultiBand images

Once you have tiles and keys doing joins becomes trivial. Merging one rdd into another is essentially a spark cogroup operation. There is a nice sugar for it:

val rdd1: RDD[(SpaceTimeKey, MultibandTile)] = ???
val rdd2: RDD[(SpaceTimeKey, MultibandTile)] = ???

val updated = rdd1.merge(other)

Eventually this is going to call apply on TileRDDMerge object: https://github.com/geotrellis/geotrellis/blob/master/spark/src/main/scala/geotrellis/spark/merge/TileRDDMerge.scala#L12

There is a detail here: Pair RDDs don't give us any guarantees of uniqueness of keys. So when doing a merge we have to consider possibility that there is going to be multiple tiles with the same key on left and right side. The rule we follow here is that we merge the left and right independently and then we merge right into left.

Saving / Querying Multiband images

To write the images you need a layer writer, here is a snippet for constructing various kinds:

def writer(): LayerWriter[LayerId] = output match {
  case "s3" =>
    S3LayerWriter(params("bucket"), params("prefix"))
  case "file" =>
    FileLayerWriter(params("path"))
  case "accumulo" =>
    val zookeeper: String = params.get("zookeeper").getOrElse {
      val conf = new Configuration // if not specified assume zookeeper is same as DFS master
      new URI(conf.get("fs.defaultFS")).getHost
    }
    val instance = AccumuloInstance(params("instance"), zookeeper, params("user"), new PasswordToken(params("password")))
    val strategy = params.get("ingestPath") match {
      case Some(path) => HdfsWriteStrategy(path)
      case None => SocketWriteStrategy()
    }
    AccumuloLayerWriter(instance, params("table"), strategy)
}

Layer writer needs to know the metadata regarding the RDDs it is writing. If we look at the write method: https://github.com/geotrellis/geotrellis/blob/master/spark/src/main/scala/geotrellis/spark/io/LayerWriter.scala#L39 we first see that it is looking for RDD[(K, V)] with Metadata[M]. (This is the same thing that layer reader will give back to us)

We can make an object with that signature by joining the TileLayerMetadata with our RDD in:

val rdd: RDD[(SpaceTimeKey, MultibandTile)] = ???
val md: TileLayerMetadata = ???

val layer: RDD[(SpaceTimeKey, MultibandTile)] with Metadata[TileLayerMetadata] = ContextRDD(rdd, md)

Indexing

The other thing we will need to do is to provide a KeyIndex or KeyIndexMethod. Ultimately our query is going to need use a B-Tree index and SpaceTimeKey does not imply any way to order our records sequentially. We can solve this problem by using a space filling curve.

In spatiotemporal case it's time to pay the piper and settle on the granularity on time since an SFC will require an integer co-ordinate. You can trace the effects using ZCurveKeyIndexMethod as an example here: https://github.com/geotrellis/geotrellis/blob/master/spark/src/main/scala/geotrellis/spark/io/index/ZCurveKeyIndexMethod.scala

This has a couple of fall outs:

  • You get improved locality from SFC properties
  • You can have index collisions depending on your resolution choices

Index collisions aren't fatal, on writing the records will be sorted by their index and any collisions will be written as a sequence of (K,V).

Reading and Querying

Reading happens as you imagine. Thankfully there is already a section of the docs for it: https://github.com/geotrellis/geotrellis/blob/master/docs/spark/spark-io.md Hopefully that covers the questions you may have.

Temporal Window

You're asking about this class: https://github.com/geotrellis/geotrellis/blob/2402607b0c6f5a4e087bc46fe32c5f56e206947e/spark/src/main/scala/geotrellis/spark/mapalgebra/local/temporal/Implicits.scala#L18

Since we're working with a collection that has some time component one thing we might want to do is to do a window operation. That is we want to define an interval and apply some operation to all values in that interval (eg: min,max,variance).

TemporalWindow class is a method extension to create a TemporalWindowState which is a builder for such an operation. Ultimately what is going to happen is:

  • map each key to some integer window ID, creating duplicate keys
  • fold over resulting records applying some operation on the values
  • return minimum key for each window with the result of the operation

So to clarify the temporal window represents more of an operation and its eventual result rather than a data structure.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment