Skip to content

Instantly share code, notes, and snippets.

@canimus
Created December 15, 2019 20:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save canimus/123d153c631b2a2c8f0e0379fc78bd3a to your computer and use it in GitHub Desktop.
Save canimus/123d153c631b2a2c8f0e0379fc78bd3a to your computer and use it in GitHub Desktop.
DaskDataFrame Collector Parquet
def extract_data(query_var, file_prefix):
idx = pd.date_range(start="2019-01-01", periods=13, freq="MS").strftime("%Y-%m-%d").values
dt = []
for i in range(len(idx)-1):
name = str(i+1).zfill(2)
df = pd.read_sql(query_var.format(idx[i], idx[i+1]), conn)
dt.append(df.dtypes)
df.to_parquet(f'parquet/{file_prefix}_{name}.parquet')
# Unique dataframe with all types
df = pd.DataFrame(seq(dt)\
.map(lambda x: x.values)\
.map(lambda x: seq(x)\
.map(str)).to_list(), columns=dt[0].index.values)
# Count the number of unique data types in each column on all dataframes
cols = df.nunique(axis=0)
# Separate columns with mismatch issues
err_cols = cols[cols > 1].index.values.flatten()
for c in err_cols:
print("Fixing: " + c)
mc = Counter(seq(dt).map(lambda x: x[c]).map(str)).most_common(1)[0][0]
pos = df[df[c] != mc].index.values+1
for p in pos:
df_fix = pd.read_parquet(f"parquet/{file_prefix}_{str(p).zfill(2)}.parquet")
df_fix[c] = df_fix[c].fillna(-1).astype(np.dtype(mc))
df_fix.to_parquet(f"parquet/{file_prefix}_{str(p).zfill(2)}.parquet")
extract_data(docs, "doc")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment