Skip to content

Instantly share code, notes, and snippets.

@mrocklin
Last active August 22, 2016 02:56
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mrocklin/ca6fd97fa17afce3b55cfc7d669dc621 to your computer and use it in GitHub Desktop.
Save mrocklin/ca6fd97fa17afce3b55cfc7d669dc621 to your computer and use it in GitHub Desktop.

Background

Wes published notes for a Pandas 2.0 design document. I thought I'd commment from the perspective of dask.dataframe.

Dask.dataframe is an out-of-core / parallel/ distributed library that logically coordinates many Pandas DataFrames. It replicates a decently large subset of the interface exactly, including elementwise operations, reductions, groupby (shuffle or indexed), merge (shuffle or indexed), timeseries operations (rolling, resample, cumfoo), etc..

I see this feedback being useful for two reasons:

  1. We may want to make a Dask.dataframe 2.0 on top of Pandas 2.0 at some point
  2. In building Dask.dataframe we had to thoroughly wrap and integrate with much of the Pandas API and abstractions in order to work as consistently as we do. An accounting of this experience might help to inform design decisions for the next time someone does this (regardless of whether it's Dask developers or otherwise).

This feedback is colored by the approach that Dask takes, which is to build a general parallel programming library first, and then build everything else on top of it. In particular, Dask.dataframe is a special case of Dask, which has a variety of other use cases. If you were to build a special library just to parallelize Pandas 2.0 then you might go about things quite differently.

Overview

Many of the points made within Wes's original document seem on point to me generally. However as a Dask developer I can ignore most of them happily. At the end of the day all I really care about are the following things:

  1. Predictable data abstractions without corner cases
  2. The ability to efficiently serialize data (turn into a bytestring/memoryview)
  3. The ability to serialize and call Python functions
  4. The ability to call multiple Python functions concurrently in a single process and still gain performance (releasing the GIL)
  5. The ability to reason about the size in bytes of dataframes

From a Dask/parallelizing perspective we can entirely ignore memory layout, C/C++ computational internals, etc. except in so far as they affect the above concerns.

Predictable Data Abstractions

Dask adopts the principle of no-creativity when it comes to wrapping other projects. That is we abide entirely by the semantic decisions made within that project (e.g. NaN for NA within NumPy). Given this principle, I actually don't have much to comment on the existing design document.

However, Dask is bound by these same decisions. Just as there exist pd.DataFrame and pd.Series objects there also exist dd.DataFrame and dd.Series objects with the same dtype structure. While we can often punt options and such down to the constituent Pandas objects we sometimes do need to do a bit of logic on our own, causing a repetition of the corner case pain referred to in the "Motivating Example" section:

if is_categorical_dtype(dtype):
    upcast_cls = 'category'
elif is_datetimetz(dtype):
    upcast_cls = 'datetimetz'
...

To the extent that this logic can be reduced with a consistent datatype extraction, memory layout, etc. is great. To the extent that it can not it would be convenient to identify and extract bits of logic like this into separate functions that can be called by downstream libraries. Hygiene like this is hard to guarantee in the practice of development of course, so elimination of this kind of code is a desirable.

Data Serialization

There are many attempts to efficiently serialize Pandas DataFrames. The current pickle solution is non-optimal, both in the sense that it doesn't think about compression and such, but also in the sense that it appears to be needlessly slow (although my benchmarks are old here). Custom attempts to serialize Pandas dataframes can be significantly faster with moderate effort.

At the same time, it is very nice if these custom solutions can be pushed into Python's standard protocols for this sort of thing, like the pickle protocols __getstate__ and __setstate__. This helps downstream parallel computing libraries avoid special casing Pandas objects.

Additionally to avoid copies, it is often nice to be directly handed memoryview or buffer objects. Dask itself doesn't actually operate in the no-copy regime (it has yet to be a bottleneck in any of our workloads) but other projects that are more performance conscious with regards to memory bandwidth prefer to avoid this. It would be nice to receive a small bytestring encoding metadata followed by a sequence of memoryview objects that can then be passed directly to a socket.

There is currently no standard protocol for this, however I suspect that we could get NumPy developers on board with a proposal easily. This, at the very least, would be a good bridge building activity.

Function Serialization

Because dask.dataframe builds on top of Pandas we frequently need to pass around Pandas functions across a wire. Tools like cloudpickle make this feasible for methods and dynamically constructed functions.

However, Pandas methods often undergo significant decoration and dynamic rewriting in ways that confuses cloudpickle. These can both cause failures and performance bottlenecks, when function serialization ends up taking significantly more time than the Pandas function being called.

In Process Parallelism

The concerns above are mostly important for distributed memory computing. However it is likely that any parallel Pandas variant will be more often used on a single machine with many cores and lots of RAM. In these cases it is advantageous to use shared memory within a single process. Over the last year the Pandas developers have been gracious enough to release the GIL in many performance critical sections. This gives us nice speedups and makes dask.dataframe on a workstation perform very nicely. I routinely get 20x speedups over in-memory Pandas on a 40 core machine running groupbys-aggregations on 50GB datasets. This isn't ideal, but is significant.

It would be nice to see this happen from the ground up rather than after-the-fact. A rewrite of a C/C++ core affords a nice opportunity here.

Pandas may prefer to engage in its own parallelism. If this occurs it would be nice to globally set a singled-threaded mode programmatically.

Reason about Size

Robust dynamic parallelism in a space constrained setting relies on tools to reason about the size, in bytes, that a piece of data takes in memory. Recent improvements to the memory_usage method help here although reliance on the Python Object dtype for text often causes underestimation, which can be troublesome.

Additionally, hooking into standard Python protocols, like __sizeof__ is appreciated to avoid special cases downstream.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment