Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@CMCDragonkai
Last active August 22, 2019 04:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save CMCDragonkai/ad84a4096fa63bc278648db6a9f22a0a to your computer and use it in GitHub Desktop.
Save CMCDragonkai/ad84a4096fa63bc278648db6a9f22a0a to your computer and use it in GitHub Desktop.
Dump Dask Dataframe to Single CSV #python #dask
import os
import tempfile
import shutil
import pandas as pd
import dask.dataframe as df
from pathlib import Path
output_path = Path('tmp')
results = {
'images_dd': df.from_pandas(
pd.DataFrame({'x': [1,2,3], 'y': [4,5,6], 'z': [7,8,9]}),
npartitions=2
)
}
# DataFrame.to_* forces immediate computation (no need for compute anymore)
with tempfile.TemporaryDirectory() as tmpdir:
images_csvs = results['images_dd'].to_csv(
os.path.join(tmpdir, 'images*.csv'),
header_first_partition_only=True,
index=False)
with open(output_path / 'images.csv', 'wb') as output_f:
for images_csv in images_csvs:
with open(images_csv, 'rb') as images_csv_f:
shutil.copyfileobj(images_csv_f, output_f)
# this preserves the laziness of the entire graph
import tempfile
import shutil
import natsort
import dask
import pandas as pd
import dask.dataframe as df
from pathlib import Path
tmp_path = Path(tempfile.gettempdir())
output_path = Path('tmp')
results = {
'images_dd': df.from_pandas(
pd.DataFrame({'x': [1,2,3], 'y': [4,5,6], 'z': [7,8,9]}),
npartitions=2
)
}
def to_csv(df, filename):
csv_path_glob = df._name + '.*.csv'
dump_csvs = df.to_csv(
tmp_path / csv_path_glob,
header_first_partition_only=True,
index=False,
compute=False)
def combine(dump_csvs):
with open(output_path / filename, 'wb') as output_fd:
# this would be simplified if dump_csvs returned the paths
for csv_path in natsort.natsorted(tmp_path.glob(csv_path_glob)):
with open(csv_path, 'rb') as csv_fd:
shutil.copyfileobj(csv_fd, output_fd)
csv_path.unlink()
return output_path / filename
return dask.delayed(combine)(dump_csvs)
r = to_csv(results['images_dd'], 'images.csv')
print(type(r))
@CMCDragonkai
Copy link
Author

This results in a chunk by chunk in-memory copy to do the final concatenation into 1 CSV file. That means the final "reduce" operation is a bottleneck. But that usually means you are no longer making use of Dask parallelism anymore. It also means we are relying on the filesystem to be capable of scaling to the entire dask dataframe, assuming we don't have enough memory to process the entire dataframe. If we did, we wouldn't need to do this, instead we would just compute the Dask dataframe into a Pandas dataframe and dump that entirely to disk.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment