Skip to content

Instantly share code, notes, and snippets.

@ian-r-rose
Created December 7, 2021 18:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ian-r-rose/4221ebf52f3423203640c498fb815f21 to your computer and use it in GitHub Desktop.
Save ian-r-rose/4221ebf52f3423203640c498fb815f21 to your computer and use it in GitHub Desktop.

Notes on High-Level-Graph-ication

The essential idea behind a high-level-graph is this: it's a lazy mapping which can produce low-level Dask task graphs on demand. Until these low-level tasks are produced (called "materialization"), they are a couple of advantages:

  1. They allow for higher level reasoning about graph structure, including optimizations that would be challenging or impossible once the graph is represented by many low-level tasks.
  2. They can be used to produce only the necessary keys for a full computation. That is, later operations like slicing can feed back into previous HLG layers and allow them to not produce tasks which won't be needed (called HLG culling). This can be a significant time and memory saving process.
  3. They can be much cheaper to serialize and communicate than low level task graphs.

However, HLG Layers have proven difficult to write. Broadly speaking, these difficulties have been for two reasons: algorithmic (specifically regarding culling) and serializability.

Culling high-level graphs

Writing a HLG layer is an exercise in not forcing materialization of the graph until absolutely necessary. Accidentally triggering materialization largely obviates the above advantages, and it can be easy to do accidentally. Currently, this is poorly documented, and I think the best place to look is in the HighLevelGraph.cull method:

    # HighLevelGraph.cull
    def cull(self, keys: Iterable) -> "HighLevelGraph":
        """Return new HighLevelGraph with only the tasks required to calculate keys.
        In other words, remove unnecessary tasks from dask.
        Parameters
        ----------
        keys
            iterable of keys or nested list of keys such as the output of
            ``__dask_keys__()``
        Returns
        -------
        hlg: HighLevelGraph
            Culled high level graph
        """
        keys_set = set(flatten(keys))

        all_ext_keys = self.get_all_external_keys()
        ret_layers = {}
        ret_key_deps = {}
        for layer_name in reversed(self._toposort_layers()):
            layer = self.layers[layer_name]
            # Let's cull the layer to produce its part of `keys`.
            # Note: use .intersection rather than & because the RHS is
            # a collections.abc.Set rather than a real set, and using &
            # would take time proportional to the size of the LHS, which
            # if there is no culling can be much bigger than the RHS.
            output_keys = keys_set.intersection(layer.get_output_keys())
            if output_keys:
                culled_layer, culled_deps = layer.cull(output_keys, all_ext_keys)
                # Update `keys` with all layer's external key dependencies, which
                # are all the layer's dependencies (`culled_deps`) excluding
                # the layer's output keys.
                external_deps = set()
                for d in culled_deps.values():
                    external_deps |= d
                external_deps -= culled_layer.get_output_keys()
                keys_set |= external_deps

                # Save the culled layer and its key dependencies
                ret_layers[layer_name] = culled_layer
                ret_key_deps.update(culled_deps)

        # Converting dict_keys to a real set lets Python optimise the set
        # intersection to iterate over the smaller of the two sets.
        ret_layers_keys = set(ret_layers.keys())
        ret_dependencies = {
            layer_name: self.dependencies[layer_name] & ret_layers_keys
            for layer_name in ret_layers
        }

        return HighLevelGraph(ret_layers, ret_dependencies, ret_key_deps)

This iterates over each of the layers in reverse topological order (that is, from last operation to first), takes the output keys for each layer, and ask the preceding layer to cull itself based on the needed keys.

The important thing to note about this implementation: It calls Layer.get_output_keys() and Layer.cull(). These are the methods which absolutely cannot materialize the layer! It is often quite challenging to figure out an algorithm for implementing those methods, and the existing low-level implementations may not work well, as it requires turning around your thinking of inputs and outputs.

Serializing high-level graphs

In addition to the algorithmic challenges of HighLevelGraph layers, there are also some serialization challenges. In particular: there is the expectation that HighLevelGraph code can run on the Dask distributed scheduler. This brings with it the requirement that no user code can be called or unpickled when generating the graph.

In practice, this means that the implementer must do two things:

  1. Implement __dask_distributed_pack__() and __dask_distributed_unpack__() for each Layer, which handles serialization.
  2. Be very careful not to bundle user code into the layer logic (e.g., no callables for customizing task production from the layer).

Moving forward on slicing and map_overlap

There are two work-in-progress layers started by Genevieve:

  1. A layer for map_overlap
  2. A layer for simple slicing arrays with integers and slice objects.

Both of these function, but are limited in utility because they don't have non-materializing implementations for the two important methods above, get_output_keys() and cull(). So the main action item for finishing the layers is to figure out how to implement those.

At a high level this means:

map_overlap:

This should be similar to Blockwise. However the output keys are not just the output blocks, but also all bordering blocks, so the implementation will necessarily be more complex. I think the way forward should be to start with the blockwise implementation for get_output_keys(), then augment with the bordering keys. Once that is in place, culling should be easier, since we already have logic for what keys we need.

Simple Slicing:

In this case, there is a good start for get_output_keys() and cull() here, but they need to be finished.

Once a simple slicing layer is done, there are a couple of important high-level optimizations that should be added. In particular, we should try to re-implement this section of low-level optimizations:

  1. low-level fusion
  2. slicing optimizations

The slicing optimizations may actually be easier than the low-level implementation, as it could be done symbolically with slice objects.

It's not currently clear to me what low-level fusion would look like outside of the slicing optimizations, but we should research it.

@GenevieveBuckley
Copy link

Thanks for writing this up Ian, I think it's a very clear overview.

Quick links for convenience:

  1. A layer for map_overlap (PR "A HighLevelGraph abstract layer for map_overlap #7595")
  2. A layer for simple slicing arrays with integers and slice objects (PR "Array slicing HighLevelGraph layer #7655").

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment