Skip to content

Instantly share code, notes, and snippets.

@makmanalp makmanalp/stata_dask.py
Last active Oct 29, 2018

Embed
What would you like to do?
Read STATA .dta files chunk by chunk (streaming) into dask with pandas's read_stata / StataReader and some hackery
import dask.dataframe as dd
from dask.dataframe.utils import make_meta
from dask.delayed import delayed
import pandas as pd
from itertools import chain
def get_stata_dask_meta(file_name, meta_chunksize=10000, *args, **kwargs):
"""Load up first bit of the file for type metadata info. We have to resort
to using iter() and chunksize trickery because the read_stata function
doesn't have an "nrows" option."""
meta_df = next(pd.read_stata(file_name, *args, chunksize=meta_chunksize, **kwargs))
return make_meta(meta_df)
def read_stata_with_offset(file_name, start_row=0, nrows=None, *args, **kwargs):
reader = pd.read_stata(file_name, iterator=True, *args, **kwargs)
# Reset file pointer to start point
reader._lines_read = start_row
return reader.read(nrows=nrows)
@delayed(pure=True)
def dask_read_stata_delayed_simple(*args, **kwargs):
"""Swap the chunked version with this to see the difference."""
return pd.read_stata(*args, **kwargs)
def dask_read_stata_delayed_chunked(file_name, delayed_chunksize=1000000, *args, **kwargs):
"""Return a list of delayed objects, each of which will return a chunk of
the given stata file."""
# Get number of rows
reader = pd.read_stata(file_name, iterator=True, *args, **kwargs)
num_rows = reader.nobs
reader.close()
# Delayed stata chunk reader
delayed_read = delayed(read_stata_with_offset, pure=True)
delayeds = []
for chunk_offset in range(0, num_rows, delayed_chunksize):
chunk_task = delayed_read(file_name,
start_row=chunk_offset,
nrows=delayed_chunksize,
*args, **kwargs)
delayeds.append(chunk_task)
return delayeds
def dask_read_stata_delayed_group(file_names, meta=None, meta_chunksize=10000, *args, **kwargs):
# Create delayed objects for every file
delayeds = [dask_read_stata_delayed_chunked(f) for f in file_names]
# Type metadata
if meta is None:
meta = get_stata_dask_meta(file_names[0], *args, **kwargs)
return dd.from_delayed(list(chain.from_iterable(delayeds)), meta=meta)
# Usage:
# files = ["a.dta","b.dta"]
# df = dask_read_stata_delayed_group(files)
# agg = df.groupby(["exporter", "year"]).export_value.sum()
# agg.visualize()
# with ProgressBar():
# result = agg.compute()
@makmanalp

This comment has been minimized.

Copy link
Owner Author

makmanalp commented Mar 1, 2018

Behold, in all its glory, the task graph:

mydask

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.
You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session.