MultibandTile is a stack of tiles, intended as mostly for multi-spectral imagery. SpaceTimeKey
has an time instant as a member, so it is a point in time. In this manner what is most readily represented is a multiband stack, rather than a cube (since we say nothing about temporal resolution).
However, nothing would stop you from placing your own meaning on the values of band index and treat each index i
as an increment of d*i
time units from the instant of the key.
If you find that kind of approach kloodgy it would be possible to create a new value type MultibandCube
or TimeCube
, something that wraps Seq[MultibandTile]
and makes it behave as a cube. In order to read/write such RDDs you would only need to provide an AvroRecordCodec
type class instance, which could be built from types already implemented with relative ease.
This is a use case we envisioned but have not flushed out yet. If you end-up going down this road I think this would be an excellent contribution that could be rolled into GeoTrellis master as you work on it.
Overall we make an attempt to express as much of the algorithms as possible through type classes such that new value and key types could be used in more aspects incrementally by implementing relevant type classes. For instance for MultibandCube
to be mergable it would need to implement (TileMergeMethods): https://github.com/geotrellis/geotrellis/blob/2402607b0c6f5a4e087bc46fe32c5f56e206947e/raster/src/main/scala/geotrellis/raster/merge/TileMergeMethods.scala
Which again in this case could be expressed in existing functionality.
MultibandTile
is an indexed stack of Tile
s (each representing a band) which all share the same pixel count, cell type, and by implication spatial extent when they're wrapped in Raster[MultibandTile]
. There is nothing that imposes meaning on the band index, so they can be anything. So far we've used them in predictable way of reading a multi-spectral images.
We can read multiband GeoTiffs like so:
import geotrellis.spark.io.hadoop._
val sc: SparkContext = ???
val source: RDD[(ProjectedExtent, MultibandTile)] = sc.hadoopMultibandGeoTiffRDD("hdfs://namenode:8020/dir/")
That uses MultibandGeoTiffInputFormat
under the covers and doesn't really require any configuration.
Reading temporal GeoTiffs is a little tricker. There doesn't seem to be a standard for how the timestamp tiff tag. There is sc.hadoopTemporalMultibandGeoTiffRDD
method that assumes the tag is GEOTIFF_TIME_TAG
but thats essentially a default we used in the past. This is an area where more sugar should be added for nicer API. But here is the verbose way to use it:
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
val sc: SparkContext = ???
val path: Path = new Path("hdfs://namenode:8020/dirs")
val conf: Configuration = sc.hadoopConfiguration.withInputDirectory(path, List("*.tiff"))
TemporalGeoTiffInputFormat.setTimeTag(conf, "SCENE_TIME")
TemporalGeoTiffInputFormat.setTimeFormat(conf, "YYYY:MM:dd HH:mm:ss")
val source: RDD[(TemporalProjectedExtent, MultibandTile)] =
sc.newAPIHadoopRDD(conf,
classOf[TemporalMultibandGeoTiffInputFormat],
classOf[TemporalProjectedExtent],
classOf[MultibandTile])
Having read some unstructured GeoTiffs isn't much use since we have no way to join one set to another or save them in meaningful way. So the thing to do is to tile them to a specific layout. We can do it like so:
val tileLayerMetadata: TileLayerMetadata = source.collectMetadata[SpaceTimeKey](FloatingLayoutScheme(512))
val tiledRdd: RDD[(SpaceTimeKey, MultibandTile)] = source.tileToLayout[SpaceTimeKey](tileLayerMetadata, Bilinear)
There are a number of overloads to collectMetadata
that allow you to specify some of the things you know:
https://github.com/geotrellis/geotrellis/blob/master/spark/src/main/scala/geotrellis/spark/package.scala#L127-L147
collectMetadata
will actually fetch the data and reduce through it to find some things out. If you know a fair bit bout the structure of your input there is a decent chance you will be able to construct TileLayerMetadata
directly and avoid this costly step.
Two choices for layout scheme are FloatingLayoutScheme
which can be at any, floating, resolution and ZoomedLayoutScheme
which represents a TMS pyramid. A TMS pryramid a resolution for each tier implied by the CRS and tile size used.
The process of tiling / retrojecting / pryamiding / saving gets called an ingest process. I think its important to be able to break up the process into these steps so the data can be handled in between. However if you find yourself in a standard case you can checkout this helper class: https://github.com/geotrellis/geotrellis/blob/master/spark/src/main/scala/geotrellis/spark/ingest/MultibandIngest.scala
case class SpaceTimeKey(col: Int, row: Int, instant: Long)
col
and row
are from the LayoutDefinition
specified in the metadata, a non-overlapping raster grid. Instant is the instant of time in milliseconds. Unlike space time is not effectively gridded. It will be gridded when writing, but in memory a SpaceTimeKey
is essentially a point in time.
Once you have tiles and keys doing joins becomes trivial. Merging one rdd into another is essentially a spark cogroup
operation. There is a nice sugar for it:
val rdd1: RDD[(SpaceTimeKey, MultibandTile)] = ???
val rdd2: RDD[(SpaceTimeKey, MultibandTile)] = ???
val updated = rdd1.merge(other)
Eventually this is going to call apply on TileRDDMerge
object: https://github.com/geotrellis/geotrellis/blob/master/spark/src/main/scala/geotrellis/spark/merge/TileRDDMerge.scala#L12
There is a detail here: Pair RDDs don't give us any guarantees of uniqueness of keys. So when doing a merge we have to consider possibility that there is going to be multiple tiles with the same key on left and right side. The rule we follow here is that we merge the left and right independently and then we merge right into left.
To write the images you need a layer writer, here is a snippet for constructing various kinds:
def writer(): LayerWriter[LayerId] = output match {
case "s3" =>
S3LayerWriter(params("bucket"), params("prefix"))
case "file" =>
FileLayerWriter(params("path"))
case "accumulo" =>
val zookeeper: String = params.get("zookeeper").getOrElse {
val conf = new Configuration // if not specified assume zookeeper is same as DFS master
new URI(conf.get("fs.defaultFS")).getHost
}
val instance = AccumuloInstance(params("instance"), zookeeper, params("user"), new PasswordToken(params("password")))
val strategy = params.get("ingestPath") match {
case Some(path) => HdfsWriteStrategy(path)
case None => SocketWriteStrategy()
}
AccumuloLayerWriter(instance, params("table"), strategy)
}
Layer writer needs to know the metadata regarding the RDDs it is writing. If we look at the write
method: https://github.com/geotrellis/geotrellis/blob/master/spark/src/main/scala/geotrellis/spark/io/LayerWriter.scala#L39
we first see that it is looking for RDD[(K, V)] with Metadata[M]
. (This is the same thing that layer reader will give back to us)
We can make an object with that signature by joining the TileLayerMetadata
with our RDD in:
val rdd: RDD[(SpaceTimeKey, MultibandTile)] = ???
val md: TileLayerMetadata = ???
val layer: RDD[(SpaceTimeKey, MultibandTile)] with Metadata[TileLayerMetadata] = ContextRDD(rdd, md)
The other thing we will need to do is to provide a KeyIndex
or KeyIndexMethod
. Ultimately our query is going to need use a B-Tree index and SpaceTimeKey
does not imply any way to order our records sequentially. We can solve this problem by using a space filling curve.
In spatiotemporal case it's time to pay the piper and settle on the granularity on time since an SFC will require an integer co-ordinate. You can trace the effects using ZCurveKeyIndexMethod
as an example here:
https://github.com/geotrellis/geotrellis/blob/master/spark/src/main/scala/geotrellis/spark/io/index/ZCurveKeyIndexMethod.scala
This has a couple of fall outs:
- You get improved locality from SFC properties
- You can have index collisions depending on your resolution choices
Index collisions aren't fatal, on writing the records will be sorted by their index and any collisions will be written as a sequence of (K,V).
Reading happens as you imagine. Thankfully there is already a section of the docs for it: https://github.com/geotrellis/geotrellis/blob/master/docs/spark/spark-io.md Hopefully that covers the questions you may have.
You're asking about this class: https://github.com/geotrellis/geotrellis/blob/2402607b0c6f5a4e087bc46fe32c5f56e206947e/spark/src/main/scala/geotrellis/spark/mapalgebra/local/temporal/Implicits.scala#L18
Since we're working with a collection that has some time component one thing we might want to do is to do a window operation. That is we want to define an interval and apply some operation to all values in that interval (eg: min,max,variance).
TemporalWindow
class is a method extension to create a TemporalWindowState
which is a builder for such an operation. Ultimately what is going to happen is:
- map each key to some integer window ID, creating duplicate keys
- fold over resulting records applying some operation on the values
- return minimum key for each window with the result of the operation
So to clarify the temporal window represents more of an operation and its eventual result rather than a data structure.