Created
February 22, 2022 16:24
-
-
Save jonashaag/67f64e9fd582758f4d88b02de6712583 to your computer and use it in GitHub Desktop.
Snowflake fix fetch
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def snowflake_fix_fetch( | |
df, | |
*, | |
parse_variant_columns=(), | |
lower_column_names=True, | |
convert_int64=True, | |
): | |
"""Apply Snowflake-specific fixes to a dataframe fetched from Snowflake. | |
- Lower all column names | |
- Parse VARIANT columns | |
- Convert all intXX dtypes to int64 | |
""" | |
if lower_column_names: | |
snowflake_lower_column_names(df, inplace=True) | |
if convert_int64: | |
snowflake_convert_int_subtypes(df, inplace=True) | |
df = snowflake_parse_variant_columns(df, parse_variant_columns, inplace=True) | |
return df | |
def snowflake_lower_column_names(df, *, inplace=False): | |
# Snowflake returns column names in all-uppercase but we use all-lowercase. | |
# Rename all column names to all-lowercase. | |
return df.rename(columns=str.lower, copy=False, inplace=inplace) | |
def snowflake_convert_int_subtypes(df, *, target_dtype="int64", inplace=False): | |
# Snowflake returns integers in the smallest subtype that has enough space | |
# for all of the data, ie. any of int{8,16,32,64}. We prefer the int64 dtype | |
# most of the time for ease of use. | |
new_dtypes = { | |
col: target_dtype if pd.api.types.is_integer_dtype(old_dtype) else old_dtype | |
for col, old_dtype in df.dtypes.items() | |
} | |
if not inplace: | |
df = df.copy() | |
for col in df.columns: | |
df[col] = df[col].astype(new_dtypes[col]) | |
return df | |
def snowflake_parse_variant_columns(df, columns, *, inplace=False): | |
# Snowflake returns non-primitive columns such as arrays as JSON string, | |
# but we want them in their deserialized form. | |
# Note that parsing the columns concurrently does not yield any speedup, | |
# because of the GIL (when using threads) or serialization overhead (when using processes) | |
if not inplace: | |
df = df.copy() | |
for col in columns: | |
df[col] = _parse_json_column(df[col]) | |
return df | |
def _parse_json_column(col): | |
# Pandas ships with ujson, which is slightly faster than the built-in json module. | |
return col.apply(lambda s: None if not s else pd.io.json.loads(s)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment