Skip to content

Instantly share code, notes, and snippets.

@jpolchlo
Created February 19, 2020 16:24
Show Gist options
  • Save jpolchlo/e0dcb6d94b13220724c8171f26901783 to your computer and use it in GitHub Desktop.
Save jpolchlo/e0dcb6d94b13220724c8171f26901783 to your computer and use it in GitHub Desktop.

A Model for Streaming Raster Computation

Goals

Desires:

  • Scalable
  • Reasonable descriptive interface (e.g., no JSON)
  • Extensible (e.g., rasters may be composed of values or spectra)
  • Multi-language (processing routines expressible in Python or Scala)

I'm after a system that can be used to describe common raster operations that can run on a spectrum of hardware configurations. A single machine can process large data sets, though slowly, without concern for crashing out. More hardware equates to better performance without significant additional pain.

It's important to note that this isn't a project for doing everything under the sun. We will specialize in map operations, as opposed to reduce operations. If the goal is to transform spatial data to spatial data, this would work better than perhaps other solutions. (Though it is conceivable that we could offer a fold operation where the base atom is a tile, and we could offer combineByKey-like semantics (Tile → Accum, Accum × Tile → Accum, Accum × Accum → Result).) This wouldn't likely be a solution for learning parameters to an ML model, but may be a good solution for applying an already-tuned model to data.

A word on words

This is intended to be a streaming system, but that may not be the right nomenclature. Streaming generally means a continuous reading process of some source file(s), which isn't exactly what I'm intending here. I'm more troubled by the resource utilization patterns that we exhibit for jobs that simply shouldn't require such a huge time burden. It may be better to say that there is a need for incremental processing? Localized processing? The idea is that we can find a way to focus our attentions on individual areas, compute what is needed in those places, and commit the results before moving on to the next position. We might be drawing data from many sources, but the abstraction is the same irrespective of the source: geographic queries lead to small increments of data that can be handled in a nearly atomic way, interacting in a very limited spatial scope. This reduces the range of tasks that we can handle, but it means that we can focus on doing the supported tasks well.

The streaming idea is one of being able to handle data in an event-driven fashion. As imagery comes on line, say, we can trigger a processing step. This implies a separation of this project into different components: a processing engine and a deployment strategy. The latter comprises the "streaming" idea. The former moves us somewhat away from Spark as a means to process

To Spark or Not to Spark?

This all starts to sound suspiciously like the capabilities of Spark. If we are to engage in this significant engineering effort, we need to provide a compelling story of why this is needed.

It's well-known that we are least happy with the Spark component of Geotrellis. In many ways this is a deployment story. Once the Spark cluster is standing, we are often good to go. But this is sometimes not the case. When things don't work right away, we struggle with the particulars of a job. We end up having to scale our hardware demands way past the point of reason, often experiencing iterative failures, with incremental modifications to the hardware profile, resulting in long cycle times. It's often not clear why this has to happen, and the Spark tools don't make it easy to understand the failure mode.

There is also an issue of scale invariance that we deal with. Collections-based implementations are entirely distinct from Spark-based implementations. They may share flow, but they are not drop-in replacements. If we write a collections-based solution, and it scales past our local hardware requirements, we have no option to scale up, unless we can find a more robustly-appointed machine on which to run. We can solve this, possibly, by always implementing in the Spark model, but then we encounter all the weirdnesses of Spark, even for small jobs.

There has to be a better way!

Features of the System Architecture

Region based

We'll begin processing by considering all participating sources. These sources may come from a larger process of filtering down a huge catalog into a subset of scenes (say via a STAC query). But when we begin, we'll have a bounded input set. Each raster input has a footprint. We could do something similar with vector data.

An assumption here is that joins occur in a common spatial reference. We won't be compiling data from radically different parts of space in the same operation. In this way, we can focus on small regions at a time without blowing up our memory from unanticipated reads.

This is all to say that all of our inputs have spatial extent. Not all inputs will have the same projection, but they can all be considered in a common frame of reference. We think of this work as being inherently spatial. We can base our work on portions of space without a loss of generality.

The Cursor

We've had success in our focal operation implementations with a cursor model to exploit locality. I'd like to use the same general model to allow for incremental processing of a scene. After selection of a target area of interest, we can break that region according to a grid pattern, in some desired projection and extent. We can then set an active grid cell to process. This cursor has an active cell and may have boundary cells, depending on the demands of computation. We can convert a cursor cell into a tile. That tile may be multibanded or have vector cell contents; the result may have a buffer.

The cursor implementation will rely on caching. Blocks will be read into a block storage with some replacement policy when memory budgets are reached, most likely LRU. These blocks will be combined to deliver the required tile result.

Vector data will be keyed to the output grid, and similarly cached. There may be problems with vector data since there is no natural bound on the amount of data that can be contained in a grid cell, without some prioritization scheme. Additionally, geometries can span multiple grid cells, so must be deduplicated when cells are agglomerated to serve a buffered request.

Data Flow Model

Operations will be specified internally as a DAG, each node producing output from multiple inputs, with the output being routed potentially to multiple downstream nodes' inputs. The network layout allows for an understanding of object lifetime, and gives the ability to anticipate things like peak memory usage in a static way.

Data Sources

With a willingness to commit to a spatial computation paradigm, we no longer need to support any arbitrary data source. Key value stores will be where it's at. This may require a user to preprocess certain unstructured inputs such as GeoJSON into a keyed store. We can provide a simple means of doing so for small inputs via local interfaces like SQLite. But for larger sources, it will fall to the user to do the extra leg work. This could require a GeoMesa deployment, or creating a vector tile cache.

Incremental Writes

A necessary component of this work is to allow for chunks of image data to be written out in a piecewise fashion. This will require some new infrastructure in our COG writer to allow for modification of an existing image on the tile level. We can, most likely, exploit the existing tile catalog infrastructure in the absence of incrementally-constructed TIFs.

Multi-language Implementation

Python possesses a wide array of capabilities engendered by its deep community. We need the capacity to tap into that, even if we are implementing the bulk of our capabilities in Scala. We have a model of computation furnished by Spark, and bolstered by our recent experiences with Arrow. We can construct work nodes that call out to a Python process and share data via some common process such as the Plasma object store. In this way, interprocess communication can be kept to a minimal burden. By using a shared data representation, it will also be possible to decrease (de)serialization penalties. This might include flatbuf representations of common datatypes, such as masked tensors. We will thus only need to provide translation services for these common formats at each language endpoint. Apache Arrow could provide the common substrate, allowing exchange of objects between languages with some amount of ease.

This is a critical feature for interop with Raster Vision.

Architectural Details

Computation nodes

Much as in the case of Catalyst, the computation is structured using nodes arranged in a DAG configuration. Many nodes will be provided to do common operations, but easy extensibility via new node creation will be available.

Nodes have a set of input types, and a single output type. We will try to rely on Arrow type declarations here, naturally relying on extension types for custom data types.

Nodes carry an exec method to process inputs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment