Last active
August 22, 2019 04:47
-
-
Save CMCDragonkai/ad84a4096fa63bc278648db6a9f22a0a to your computer and use it in GitHub Desktop.
Dump Dask Dataframe to Single CSV #python #dask
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import tempfile | |
import shutil | |
import pandas as pd | |
import dask.dataframe as df | |
from pathlib import Path | |
output_path = Path('tmp') | |
results = { | |
'images_dd': df.from_pandas( | |
pd.DataFrame({'x': [1,2,3], 'y': [4,5,6], 'z': [7,8,9]}), | |
npartitions=2 | |
) | |
} | |
# DataFrame.to_* forces immediate computation (no need for compute anymore) | |
with tempfile.TemporaryDirectory() as tmpdir: | |
images_csvs = results['images_dd'].to_csv( | |
os.path.join(tmpdir, 'images*.csv'), | |
header_first_partition_only=True, | |
index=False) | |
with open(output_path / 'images.csv', 'wb') as output_f: | |
for images_csv in images_csvs: | |
with open(images_csv, 'rb') as images_csv_f: | |
shutil.copyfileobj(images_csv_f, output_f) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# this preserves the laziness of the entire graph | |
import tempfile | |
import shutil | |
import natsort | |
import dask | |
import pandas as pd | |
import dask.dataframe as df | |
from pathlib import Path | |
tmp_path = Path(tempfile.gettempdir()) | |
output_path = Path('tmp') | |
results = { | |
'images_dd': df.from_pandas( | |
pd.DataFrame({'x': [1,2,3], 'y': [4,5,6], 'z': [7,8,9]}), | |
npartitions=2 | |
) | |
} | |
def to_csv(df, filename): | |
csv_path_glob = df._name + '.*.csv' | |
dump_csvs = df.to_csv( | |
tmp_path / csv_path_glob, | |
header_first_partition_only=True, | |
index=False, | |
compute=False) | |
def combine(dump_csvs): | |
with open(output_path / filename, 'wb') as output_fd: | |
# this would be simplified if dump_csvs returned the paths | |
for csv_path in natsort.natsorted(tmp_path.glob(csv_path_glob)): | |
with open(csv_path, 'rb') as csv_fd: | |
shutil.copyfileobj(csv_fd, output_fd) | |
csv_path.unlink() | |
return output_path / filename | |
return dask.delayed(combine)(dump_csvs) | |
r = to_csv(results['images_dd'], 'images.csv') | |
print(type(r)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This results in a chunk by chunk in-memory copy to do the final concatenation into 1 CSV file. That means the final "reduce" operation is a bottleneck. But that usually means you are no longer making use of Dask parallelism anymore. It also means we are relying on the filesystem to be capable of scaling to the entire dask dataframe, assuming we don't have enough memory to process the entire dataframe. If we did, we wouldn't need to do this, instead we would just compute the Dask dataframe into a Pandas dataframe and dump that entirely to disk.