Skip to content

Instantly share code, notes, and snippets.

@austospumanto
Created July 23, 2019 02:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save austospumanto/328d1d46d05318e0b3c43526df7f835a to your computer and use it in GitHub Desktop.
Save austospumanto/328d1d46d05318e0b3c43526df7f835a to your computer and use it in GitHub Desktop.
Some Python Pandas Utility Functions (Serialization, Data Type Casting)
"""
pip install \
pandas \
sqlalchemy \
pyarrow
"""
import json
import os
import time
from functools import wraps
from pathlib import Path
from typing import Set
from typing import Type
from typing import Union
import pandas as pd
from sqlalchemy import Table
ENGINE = "pyarrow"
PD_NULLABLE_INT_TYPE = "Int64"
def timeit():
def outer(func):
@wraps(func)
def inner(*args, **kwargs):
start_time = time.time()
res = func(*args, **kwargs)
interval = time.time() - start_time
print("Time for '%s': %0.3f seconds" % (func.__qualname__, interval))
return res
return inner
return outer
def read_pd_dtypes(df_fp: Path) -> pd.Series:
assert isinstance(df_fp, Path) and "dtypes.pkl" not in str(df_fp), df_fp
return pd.read_pickle(str(df_fp) + ".dtypes.pkl")
def write_pd_dtypes(df_fp: Path, df: pd.DataFrame) -> None:
assert isinstance(df_fp, Path) and "dtypes.pkl" not in str(df_fp), df_fp
assert isinstance(df, pd.DataFrame), type(df)
df.dtypes.to_pickle(str(df_fp) + ".dtypes.pkl")
class Dtypes:
object = pd.np.dtype("O")
def read_pickle(filepath: Path) -> pd.DataFrame:
assert isinstance(filepath, Path), filepath
return pd.read_pickle(str(filepath))
def to_pickle(df, filepath: Path):
assert isinstance(filepath, Path), filepath
if not filepath.parent.exists():
os.makedirs(str(filepath.parent))
df.to_pickle(filepath)
print(f"Wrote to {filepath}")
def to_feather(df_: pd.DataFrame, filepath: Path) -> None:
nullable_ints = get_nullable_int_cols(df_)
df_.astype(dtype={c: float for c in nullable_ints}).to_feather(filepath)
print(f"Wrote to {filepath}")
nullable_ints__fout = str(filepath) + ".nullints.json"
Path(nullable_ints__fout).write_text(
json.dumps(nullable_ints, indent=4, sort_keys=True)
)
print(f"Wrote to {nullable_ints__fout}")
def read_feather(filepath: Path) -> pd.DataFrame:
nullable_ints__fin = Path(str(filepath) + ".nullints.json")
nullable_ints = json.loads(Path(nullable_ints__fin).read_text())
assert isinstance(nullable_ints, list)
return pd.read_feather(str(filepath)).astype(
dtype={c: PD_NULLABLE_INT_TYPE for c in nullable_ints}
)
def to_csv(df: pd.DataFrame, filepath: Path, opened_file=None) -> None:
assert isinstance(filepath, Path), filepath
write_pd_dtypes(df=df, df_fp=filepath)
df.reset_index().to_csv(opened_file or str(filepath), index=False)
def append_to_csv(df: pd.DataFrame, filepath: Path, opened_file=None) -> None:
assert isinstance(filepath, Path), filepath
assert filepath.exists(), filepath
df.reset_index().to_csv(
opened_file or str(filepath), index=False, mode="a", header=False
)
def read_csv(filepath: Path) -> pd.DataFrame:
assert isinstance(filepath, Path), filepath
dtype, parse_dates = split_out_parse_dates(read_pd_dtypes(filepath))
return pd.read_csv(
filepath, dtype=dtype, keep_default_na=True, parse_dates=parse_dates
)
def get_nullable_int_cols(cls: Union[pd.DataFrame, Type, Table]):
# noinspection PyUnresolvedReferences
return [
colname
for colname, coltype in cls.dtypes.iteritems()
if isinstance(coltype, pd.Int64Dtype)
]
def to_parquet(df_: pd.DataFrame, filepath: Path) -> None:
nullable_ints = get_nullable_int_cols(df_)
df_.astype(dtype={c: float for c in nullable_ints}).to_parquet(
filepath, index=False, engine=ENGINE
)
print(f"Wrote to {filepath}")
nullable_ints__fout = str(filepath) + ".nullints.json"
Path(nullable_ints__fout).write_text(
json.dumps(nullable_ints, indent=4, sort_keys=True)
)
print(f"Wrote to {nullable_ints__fout}")
def read_parquet(filepath: Path) -> pd.DataFrame:
nullable_ints__fin = Path(str(filepath) + ".nullints.json")
nullable_ints = json.loads(Path(nullable_ints__fin).read_text())
assert isinstance(nullable_ints, list)
return pd.read_parquet(path=str(filepath), engine=ENGINE).astype(
dtype={c: PD_NULLABLE_INT_TYPE for c in nullable_ints}
)
def split_out_parse_dates(pd_dtypes: pd.Series):
parse_dates = []
new_pd_dtypes = {}
for colname, coltype in pd_dtypes.items():
try:
is_dtt = pd.np.issubdtype(coltype, pd.np.dtype("datetime64[ns]"))
except TypeError:
is_dtt = False
if is_dtt:
parse_dates.append(colname)
else:
new_pd_dtypes[colname] = coltype
return new_pd_dtypes, parse_dates
def get_tablename_from_filepath(fp: Path) -> str:
fp_name_parts = fp.name.split(".")
assert len(fp_name_parts) == 3, fp_name_parts
return fp_name_parts[1]
def configure_csv_field_size_limit():
"""
To avoid this error:
--
Traceback (most recent call last):
File "$PROJECT_ROOT_DIR/.venv/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3296, in run_code
exec(code_obj, self.user_global_ns, self.user_ns)
File "<ipython-input-5-263240bbee7e>", line 1, in <module>
main()
File "<ipython-input-4-1e6205fa58ad>", line 68, in main
load_into_bigquery(fp)
File "$PROJECT_ROOT_DIR/$PROJECT_PACKAGE_NAME/utils/timing.py", line 27, in inner
return func(*args, **kwargs)
File "$PROJECT_ROOT_DIR/$PROJECT_PACKAGE_NAME/bq_schema.py", line 321, in load_into_bigquery
theschema = genschema2(fp)
File "$PROJECT_ROOT_DIR/$PROJECT_PACKAGE_NAME/utils/timing.py", line 27, in inner
return func(*args, **kwargs)
File "$PROJECT_ROOT_DIR/$PROJECT_PACKAGE_NAME/bq_schema.py", line 106, in genschema2
schema = run_schema_generator(sg, fp)
File "$PROJECT_ROOT_DIR/$PROJECT_PACKAGE_NAME/utils/timing.py", line 27, in inner
return func(*args, **kwargs)
File "$PROJECT_ROOT_DIR/$PROJECT_PACKAGE_NAME/bq_schema.py", line 156, in run_schema_generator
schema_map, error_logs = sg.deduce_schema(file=fin)
File "$PROJECT_ROOT_DIR/.venv/lib/python3.7/site-packages/bigquery_schema_generator/generate_schema.py", line 168, in deduce_schema
for json_object in reader:
File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/csv.py", line 112, in __next__
row = next(self.reader)
_csv.Error: field larger than field limit (131072)
"""
import sys
import csv
max_int = sys.maxsize
while True:
# decrease the max_int value by factor 10
# as long as the OverflowError occurs.
try:
csv.field_size_limit(max_int)
break
except OverflowError:
max_int = int(max_int / 10)
def downcast_numeric_dtypes(
df: Union[pd.Series, pd.DataFrame]
) -> Union[pd.Series, pd.DataFrame]:
if isinstance(df, pd.Series):
if df.dtype == "int":
return pd.to_numeric(df, downcast="integer")
elif df.dtype == "float":
return pd.to_numeric(df, downcast="float")
else:
return df
else:
return pd.concat(
[
df.select_dtypes(exclude=["int", "float"]),
df.select_dtypes(include="int").apply(
pd.to_numeric, downcast="integer"
),
df.select_dtypes(include="float").apply(
pd.to_numeric, downcast="float"
),
],
axis="columns",
)
@timeit()
def downcast_category_dtypes(df: pd.DataFrame) -> pd.DataFrame:
return pd.concat(
[
df.select_dtypes(exclude=["object", "category"]),
df.select_dtypes(include="object").apply(
lambda s: s.astype("category") if s.nunique() / s.shape[0] < 0.5 else s
),
df.select_dtypes(include="category").apply(
lambda s: s.cat.remove_unused_categories()
),
],
axis="columns",
)
@timeit()
def downcast_dtypes(df: pd.DataFrame) -> pd.DataFrame:
return pd.concat(
[
df.select_dtypes(exclude=["int", "float", "object", "category"]),
df.select_dtypes(include="int").apply(pd.to_numeric, downcast="integer"),
df.select_dtypes(include="float").apply(pd.to_numeric, downcast="float"),
df.select_dtypes(include="object").apply(
lambda s: s.astype("category") if s.nunique() / s.shape[0] < 0.5 else s
),
df.select_dtypes(include="category").apply(
lambda s: s.cat.remove_unused_categories()
),
],
axis="columns",
)
def get_dt_colnames(df: pd.DataFrame) -> Set[str]:
return set(df.dtypes[df.dtypes == pd.np.dtype("<M8[ns]")].index.values)
def assert_integer_dtype(dtype) -> None:
assert pd.np.issubdtype(dtype, pd.np.integer), dtype
def boolcols2dtype(df: pd.DataFrame, dtype) -> pd.DataFrame:
prefix = f"In {boolcols2dtype.__qualname__} --"
mask = df.select_dtypes(["category", object, bool, int]).apply(
lambda col: pd.Series(col.unique())
.isin([pd.np.nan, True, False, 0, 1, 0.0, 1.0])
.all()
)
print(f"{prefix} mask={mask}")
colnames = mask[mask].index
return df.assign(**{colname: df[colname].astype(dtype) for colname in colnames})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment