Skip to content

Instantly share code, notes, and snippets.

@martindurant
Last active May 1, 2023 20:52
Show Gist options
  • Save martindurant/c82c62048a7987dbe9988f44779f1054 to your computer and use it in GitHub Desktop.
Save martindurant/c82c62048a7987dbe9988f44779f1054 to your computer and use it in GitHub Desktop.
Dataframe column optimization
from dask.layers import DataFrameIOLayer
import pandas as pd
def find_columns(df):
io_layers = {k: lay for k, lay in df.dask.layers.items() if isinstance(lay, DataFrameIOLayer)}
required = {k: set() for k in io_layers}
for k, io_lay in io_layers.items():
allcols = io_lay.collection_annotations["series_dtypes"]
meta = pd.DataFrame({c: pd.Series([], dtype=dt) for c, dt in allcols.items()})
for col in allcols:
mock = list(meta.columns)
mock.remove(col)
mocked = meta[mock]
def min_df(*args, **kwargs):
return mocked
kk = list(io_lay.dsk)[0]
val = io_lay.dsk[kk]
io_lay.dsk[kk] = (min_df, ) + val[1:]
df.dask.layers[k] = io_lay
try:
df.compute(optimize=False, scheduler="sync")
except:
required[k].add(col)
finally:
io_lay.dsk[kk] = val
return required
@martindurant
Copy link
Author

Clearly this scales as:
number of input columns X number of layers X number of partitions

If someone can figure out the best way to select only the first partition throughout the graph, that problem goes away.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment