Skip to content

Instantly share code, notes, and snippets.

@xenups
Last active January 11, 2024 22:35
Show Gist options
  • Save xenups/aeae36334689b2b0cfbec868b4e3d220 to your computer and use it in GitHub Desktop.
Save xenups/aeae36334689b2b0cfbec868b4e3d220 to your computer and use it in GitHub Desktop.
from collections import defaultdict
from typing import Dict, List, Any
def hash_outer_join_two_tuple(list1, list2, list1_fields, list2_fields):
dict1 = defaultdict(list)
dict2 = defaultdict(list)
def pk(r, fields):
try:
return tuple([r[i] for i in fields])
except IndexError:
raise ValueError("Index out of range")
for i in list1:
dict1[pk(i, list1_fields)].append(i)
for i in list2:
dict2[pk(i, list2_fields)].append(i)
keys1 = set(dict1.keys())
keys2 = set(dict2.keys())
shared_keys = keys1 & keys2
only_keys1 = keys1 - keys2
only_keys2 = keys2 - keys1
def merge_two(r1, r2):
rtn = list(r1 or [None] * len(list1[0]))
rtn.extend(list(r2 or [None] * len(list2[0])))
return tuple(rtn)
result = []
for key in shared_keys:
for i in dict1[key]:
for j in dict2[key]:
result.append(merge_two(i, j))
for key in only_keys1:
for i in dict1[key]:
result.append(merge_two(i, None))
for key in only_keys2:
for i in dict2[key]:
result.append(merge_two(None, i))
return result
def join_two_dict(_dict1: Dict, _dict2: Dict, field_list1: List, field_list2: List) -> List[Dict[Any, Any]]:
# Validate fields in field_list1
for field in field_list1:
if field not in _dict1:
raise ValueError(f"Field '{field}' not found in _dict1")
# Validate fields in field_list2
for field in field_list2:
if field not in _dict2:
raise ValueError(f"Field '{field}' not found in _dict2")
ordered_dict1 = defaultdict(lambda: None, _dict1)
ordered_dict2 = defaultdict(lambda: None, _dict2)
dict_1_keys_with_indices = {key: index for index, key in enumerate(ordered_dict1.keys())}
dict_2_keys_with_indices = {key: index for index, key in enumerate(ordered_dict2.keys())}
combined_keys_with_indices = {key: value for key, value in
zip(dict_1_keys_with_indices.keys(), range(len(dict_1_keys_with_indices)))
if key not in dict_2_keys_with_indices}
combined_keys_with_indices.update({key: value for key, value in zip(dict_2_keys_with_indices.keys(),
range(len(dict_1_keys_with_indices),
len(dict_1_keys_with_indices) + len(
dict_2_keys_with_indices)))})
dict1_all_values = list(ordered_dict1.values())
dict2_all_values = list(ordered_dict2.values())
list1_fields = [dict_1_keys_with_indices[v] for v in field_list1]
list2_fields = [dict_2_keys_with_indices[v] for v in field_list2]
result = hash_outer_join_two_tuple([dict1_all_values],
[dict2_all_values],
list1_fields=list1_fields,
list2_fields=list2_fields)
result_dicts = [{key: result_tuple[value] for key, value in
combined_keys_with_indices.items()}
for result_tuple in
result]
return result_dicts
def multiple_join(lists: List[Dict], field_lists: List[List]):
while len(lists) > 1:
joined_result = join_two_dict(dict(lists[0]), dict(lists[1]), field_lists[0], field_lists[1])
lists = [joined_result[0]] + lists[2:]
field_lists = [field_lists[2]] + field_lists[2:] # Increment index by 2
return joined_result
def recursive_join(lists: List[Dict], field_lists: List[List]):
if sum(2 for _ in range(len(lists))) != len(field_lists):
raise ValueError("fields length is not valid")
if len(lists) == 2:
return join_two_dict(dict(lists[0]), dict(lists[1]), field_lists[0], field_lists[1])
else:
first_two_result = join_two_dict(dict(lists[0]), dict(lists[1]), field_lists[0], field_lists[1])
remaining_lists = [first_two_result[0]] + lists[2:]
remaining_field_lists = [field_lists[2]] + field_lists[3:]
return recursive_join(remaining_lists, remaining_field_lists)
def iterative_join(lists: List[Dict], field_lists: List[List]):
if sum(2 for _ in range(len(lists))) != len(field_lists):
raise ValueError("fields length is not valid")
while len(lists) > 1:
first_two_result = join_two_dict(dict(lists[0]), dict(lists[1]), field_lists[0], field_lists[1])
remaining_lists = [first_two_result[0]] + lists[2:]
remaining_field_lists = field_lists[2:]
lists, field_lists = remaining_lists, remaining_field_lists
return dict(lists[0])
field_lists = [['id', 'name'], ['id', 'name'], ['id', 'name'], ['id', 'name'], ['id'], ['id']]
dict1 = {'id': 1, 'name': 'Alice', 'age': 25, 'x': 'yes'}
dict2 = {'id': 1, 'city': 'New York', 'name': 'Alice'}
dict3 = {'id': 1, 'occupation': 'Engineer', 'x': 'yes', 'name': 'Alice'}
dict4 = {'id': 1, 'a': 'a', 'b': 'b'}
y = recursive_join([dict1, dict2, dict3], field_lists=field_lists)
x = iterative_join([dict1, dict2, dict3], field_lists=field_lists)
print(y)
print(x)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment