Skip to content

Instantly share code, notes, and snippets.

@samukweku
Last active April 16, 2022 14:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save samukweku/9da7593cecc3aa0408b6e55862e39fb1 to your computer and use it in GitHub Desktop.
Save samukweku/9da7593cecc3aa0408b6e55862e39fb1 to your computer and use it in GitHub Desktop.
Idea for wide_to_long implementation in datatable
from datatable import dt, f
from typing import Pattern, NamedTuple, Union
from collections import Counter, defaultdict
from itertools import compress, chain
import re
import numpy as np
class measure(NamedTuple):
"""reshape either with a separator or a regular expression."""
column_names:Union[str, list]
sep:Union[str, Pattern] = None
pattern:Union[str, Pattern] = None
def melt(data, id_vars=None, measure_vars=None, variable_name = 'variable', value_name = 'value'):
"""
Turns Frame from wide to long form.
"""
if id_vars:
if not isinstance(id_vars, (str, list, tuple)):
raise TypeError('id_vars should be one of str, list, tuple.')
if isinstance(id_vars, str):
id_vars = [id_vars]
checks = set(id_vars).difference(data.names)
if checks:
raise ValueError(f'Labels {checks} in id_vars do not exist in the column names.')
if not set(data.names).difference(id_vars):
return data
checks = [key for key,value in Counter(id_vars).items() if value > 1]
if checks:
raise ValueError(f"Labels {checks} are duplicated in id_vars.")
if not measure_vars:
measure_vars = [name for name in data.names if name not in id_vars]
if measure_vars:
if not isinstance(measure_vars, (str, list, tuple)):
raise TypeError('measure_vars should be one of str, list, tuple.')
if isinstance(measure_vars, str):
measure_vars = [measure_vars]
checks = set(measure_vars).difference(data.names)
if checks:
raise ValueError(f'Labels {checks} in measure_vars do not exist in the column names.')
checks = [key for key,value in Counter(measure_vars).items() if value > 1]
if checks:
raise ValueError(f"Labels {checks} are duplicated in measure_vars.")
if (not id_vars) and (len(measure_vars) < data.ncols):
id_vars = [name for name in data.names if name not in measure_vars]
else:
measure_vars = data.names
def reshape_no_dot(measure_vars, output, data, id_vars=None):
"""If there is no .value, to keep that section of the column as a header"""
values = []
for frame in data[:, measure_vars]:
frame.names = [value_name]
values.append(frame)
values = dt.rbind(values, force=True)
if id_vars:
id_vars = dt.repeat(data[:, id_vars], len(measure_vars))
return dt.cbind([id_vars, output, values], force = True)
return dt.cbind([output, values], force = True)
def reshape_dot(column_names, data, measure_vars, output, id_vars=None):
"reshape if '.value' is present in the column names."
boolean = [True if ent == '.value' else False for ent in column_names]
dot_value = [[*compress(extract, boolean)] for extract in output]
if len(dot_value[0]) > 1:
dot_value = ["".join(extract) for extract in dot_value]
else:
dot_value = [*chain.from_iterable(dot_value)]
checks = set(dot_value)
if id_vars and checks.intersection(id_vars):
raise ValueError(
f"The new column names associated with .value -> {checks} "
"are duplicated in id_vars."
)
boolean = [not true for true in boolean]
others = [tuple(compress(extract, boolean)) for extract in output]
headers_for_others = [extract for extract in column_names if extract != '.value']
measure_vars = [frame for frame in data[:, measure_vars]]
out = defaultdict(list)
for key, value_column, frame in zip(others, dot_value, measure_vars):
frame.names = [value_column]
out[key].append(frame)
headers_for_others = [dt.Frame([key], names = headers_for_others) for key, _ in out.items()]
out = [dt.cbind(frame, force = True) for _, frame in out.items()]
out = [dt.cbind(dt.repeat(left, right.nrows), right, force = True) for left, right in zip(headers_for_others, out)]
out = dt.rbind(out, force = True)
if id_vars:
id_vars = dt.repeat(data[:, id_vars], out.nrows//data.nrows)
return dt.cbind([id_vars, out], force = True)
return out
if not isinstance(variable_name, (str, tuple, dict, Pattern)):
raise TypeError('variable_name should be one of string, tuple, dictionary, regular expression.')
if isinstance(variable_name, str):
if not isinstance(value_name, str):
raise TypeError('value_name should be a string.')
if value_name == variable_name:
raise ValueError(
f"{value_name} is duplicated as variable_name. "
f"Kindly provide a unique argument for {value_name}.")
if id_vars:
if variable_name in id_vars:
raise ValueError(
f"{variable_name} already exists as a label "
"in id_vars. Kindly provide a unique argument.")
if value_name in id_vars:
raise ValueError(
f"{value_name} already exists as a label "
"in id_vars. Kindly provide a unique argument.")
output = dt.Frame({variable_name:measure_vars})
output = output[np.repeat(range(output.nrows), data.nrows),:]
return reshape_no_dot(measure_vars=measure_vars, output = output, data = data, id_vars = id_vars)
if isinstance(variable_name, Pattern):
if not re.compile(variable_name).groups:
raise ValueError("The regex should have at least one group.")
output = [re.search(variable_name, word) for word in measure_vars]
no_matches = [word for word, match in zip(measure_vars, output) if not match]
if no_matches:
raise ValueError(
f"There was no match for labels {no_matches} "
"for the provided regular expression.")
output = [entry.groupdict() for entry in output]
checks = output[0].keys()
if id_vars and set(checks).intersection(id_vars):
raise ValueError(
f"Labels {checks} already exist in id_vars. "
"Kindly provide unique names for the named groups "
"in the regular expression."
)
output = dt.Frame(output)
output = output[np.repeat(range(output.nrows), data.nrows),:]
return reshape_no_dot(measure_vars=measure_vars, output = output, data = data, id_vars = id_vars)
if isinstance(variable_name, dict) :
checks = set(variable_name).intersection(id_vars)
if id_vars and checks:
raise ValueError(
f"Labels {checks} already exist in id_vars. "
"Kindly provide keys for the dictionary "
"that do not exist in id_vars."
)
for key, regex in variable_name.items():
if not isinstance(key, str):
raise TypeError(f"{key} should be a string.")
if not isinstance(regex, (str, Pattern)):
raise TypeError(
f"The value for {key} should be a regular expression, "
"or can be compiled into one."
)
if re.compile(regex).groups:
raise ValueError("The regex should not have any groups.")
output = []
for key, regex in variable_name.items():
out = [word for word in measure_vars if re.search(regex, word)]
if not out:
raise ValueError(
f"There was no match for {key} for regex => {regex}"
)
measure_vars = [word for word in measure_vars if word not in out]
if len(out) == 1:
frame.names = [key]
output.append(frame)
else:
values = []
for frame in data[:, out]:
frame.names = [key]
values.append(frame)
output.append(dt.rbind(values, force = True))
output = dt.cbind(output, force=True)
if id_vars:
id_vars = dt.repeat(data[:, id_vars], output.nrows//data.nrows)
return dt.cbind([id_vars, output])
return output
if isinstance(variable_name, tuple):
variable_name = measure(*variable_name)
column_names, sep, pattern = variable_name
if not column_names:
raise ValueError("Kindly provide argument for column_names, in the variable_name tuple.")
if not isinstance(column_names, (str, list)):
raise TypeError('column_names should be one of string, list.')
if isinstance(column_names, str):
column_names = [column_names]
if id_vars:
checks = set(column_names)
checks.discard(".value")
checks = checks.intersection(id_vars)
if checks:
raise ValueError(
f"Labels {checks} already exist in id_vars. "
"Kindly provide unique column_names "
"that do not exist in id_vars."
)
if not any((sep, pattern)):
raise ValueError("Kindly provide one of sep or pattern.")
if sep and pattern:
raise ValueError("only one of sep or pattern should be provided.")
if sep:
if not isinstance(sep, (str, Pattern)):
raise TypeError(
"sep should be a regular expression, "
"or can be compiled into one.")
output = [re.split(sep, word) for word in measure_vars]
checks = max(map(len, output))
if len(column_names) != checks:
raise ValueError(
f"The maximum number of splits for sep -> {sep} is {checks} "
f"while the number of labels in {column_names} "
f"is {len(column_names)}"
)
if '.value' not in column_names:
output = [*map(tuple, output)]
output = dt.Frame(output, names=column_names)
output = output[np.repeat(range(output.nrows), data.nrows),:]
return reshape_no_dot(measure_vars=measure_vars, output = output, data = data, id_vars = id_vars)
return reshape_dot(column_names, data, measure_vars, output, id_vars=id_vars)
if pattern:
if not isinstance(pattern, (str, Pattern)):
raise TypeError(
"pattern should be a regular expression, "
"or can be compiled into one.")
checks = re.compile(pattern).groups
if not checks:
raise ValueError("The regex should have at least one group.")
if checks != len(column_names):
raise ValueError(
"The number of groups in the regex "
"should match the number of labels in column_names. "
f"The number of groups in the regex is {len(checks)}, "
f"while the length of column_names is {len(column_names)}")
output = [re.findall(pattern, word) for word in measure_vars]
no_matches = [word for word, match in zip(measure_vars, output) if not match]
if no_matches:
raise ValueError(
f"There was no match for labels {no_matches} "
"for the provided regular expression.")
output = [*chain.from_iterable(output)]
if '.value' not in column_names:
output = [*map(tuple, output)]
output = dt.Frame(output, names=column_names)
output = output[np.repeat(range(output.nrows), data.nrows),:]
return reshape_no_dot(measure_vars=measure_vars, output = output, data = data, id_vars = id_vars)
return reshape_dot(column_names, data, measure_vars, output, id_vars=id_vars)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment