Last active
January 11, 2024 22:35
-
-
Save xenups/aeae36334689b2b0cfbec868b4e3d220 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import defaultdict | |
from typing import Dict, List, Any | |
def hash_outer_join_two_tuple(list1, list2, list1_fields, list2_fields): | |
dict1 = defaultdict(list) | |
dict2 = defaultdict(list) | |
def pk(r, fields): | |
try: | |
return tuple([r[i] for i in fields]) | |
except IndexError: | |
raise ValueError("Index out of range") | |
for i in list1: | |
dict1[pk(i, list1_fields)].append(i) | |
for i in list2: | |
dict2[pk(i, list2_fields)].append(i) | |
keys1 = set(dict1.keys()) | |
keys2 = set(dict2.keys()) | |
shared_keys = keys1 & keys2 | |
only_keys1 = keys1 - keys2 | |
only_keys2 = keys2 - keys1 | |
def merge_two(r1, r2): | |
rtn = list(r1 or [None] * len(list1[0])) | |
rtn.extend(list(r2 or [None] * len(list2[0]))) | |
return tuple(rtn) | |
result = [] | |
for key in shared_keys: | |
for i in dict1[key]: | |
for j in dict2[key]: | |
result.append(merge_two(i, j)) | |
for key in only_keys1: | |
for i in dict1[key]: | |
result.append(merge_two(i, None)) | |
for key in only_keys2: | |
for i in dict2[key]: | |
result.append(merge_two(None, i)) | |
return result | |
def join_two_dict(_dict1: Dict, _dict2: Dict, field_list1: List, field_list2: List) -> List[Dict[Any, Any]]: | |
# Validate fields in field_list1 | |
for field in field_list1: | |
if field not in _dict1: | |
raise ValueError(f"Field '{field}' not found in _dict1") | |
# Validate fields in field_list2 | |
for field in field_list2: | |
if field not in _dict2: | |
raise ValueError(f"Field '{field}' not found in _dict2") | |
ordered_dict1 = defaultdict(lambda: None, _dict1) | |
ordered_dict2 = defaultdict(lambda: None, _dict2) | |
dict_1_keys_with_indices = {key: index for index, key in enumerate(ordered_dict1.keys())} | |
dict_2_keys_with_indices = {key: index for index, key in enumerate(ordered_dict2.keys())} | |
combined_keys_with_indices = {key: value for key, value in | |
zip(dict_1_keys_with_indices.keys(), range(len(dict_1_keys_with_indices))) | |
if key not in dict_2_keys_with_indices} | |
combined_keys_with_indices.update({key: value for key, value in zip(dict_2_keys_with_indices.keys(), | |
range(len(dict_1_keys_with_indices), | |
len(dict_1_keys_with_indices) + len( | |
dict_2_keys_with_indices)))}) | |
dict1_all_values = list(ordered_dict1.values()) | |
dict2_all_values = list(ordered_dict2.values()) | |
list1_fields = [dict_1_keys_with_indices[v] for v in field_list1] | |
list2_fields = [dict_2_keys_with_indices[v] for v in field_list2] | |
result = hash_outer_join_two_tuple([dict1_all_values], | |
[dict2_all_values], | |
list1_fields=list1_fields, | |
list2_fields=list2_fields) | |
result_dicts = [{key: result_tuple[value] for key, value in | |
combined_keys_with_indices.items()} | |
for result_tuple in | |
result] | |
return result_dicts | |
def multiple_join(lists: List[Dict], field_lists: List[List]): | |
while len(lists) > 1: | |
joined_result = join_two_dict(dict(lists[0]), dict(lists[1]), field_lists[0], field_lists[1]) | |
lists = [joined_result[0]] + lists[2:] | |
field_lists = [field_lists[2]] + field_lists[2:] # Increment index by 2 | |
return joined_result | |
def recursive_join(lists: List[Dict], field_lists: List[List]): | |
if sum(2 for _ in range(len(lists))) != len(field_lists): | |
raise ValueError("fields length is not valid") | |
if len(lists) == 2: | |
return join_two_dict(dict(lists[0]), dict(lists[1]), field_lists[0], field_lists[1]) | |
else: | |
first_two_result = join_two_dict(dict(lists[0]), dict(lists[1]), field_lists[0], field_lists[1]) | |
remaining_lists = [first_two_result[0]] + lists[2:] | |
remaining_field_lists = [field_lists[2]] + field_lists[3:] | |
return recursive_join(remaining_lists, remaining_field_lists) | |
def iterative_join(lists: List[Dict], field_lists: List[List]): | |
if sum(2 for _ in range(len(lists))) != len(field_lists): | |
raise ValueError("fields length is not valid") | |
while len(lists) > 1: | |
first_two_result = join_two_dict(dict(lists[0]), dict(lists[1]), field_lists[0], field_lists[1]) | |
remaining_lists = [first_two_result[0]] + lists[2:] | |
remaining_field_lists = field_lists[2:] | |
lists, field_lists = remaining_lists, remaining_field_lists | |
return dict(lists[0]) | |
field_lists = [['id', 'name'], ['id', 'name'], ['id', 'name'], ['id', 'name'], ['id'], ['id']] | |
dict1 = {'id': 1, 'name': 'Alice', 'age': 25, 'x': 'yes'} | |
dict2 = {'id': 1, 'city': 'New York', 'name': 'Alice'} | |
dict3 = {'id': 1, 'occupation': 'Engineer', 'x': 'yes', 'name': 'Alice'} | |
dict4 = {'id': 1, 'a': 'a', 'b': 'b'} | |
y = recursive_join([dict1, dict2, dict3], field_lists=field_lists) | |
x = iterative_join([dict1, dict2, dict3], field_lists=field_lists) | |
print(y) | |
print(x) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment