Skip to content

Instantly share code, notes, and snippets.

@Iftimie
Created September 13, 2020 19:23
Show Gist options
  • Save Iftimie/a365ce393e541d21b3a31bd857f7993b to your computer and use it in GitHub Desktop.
Save Iftimie/a365ce393e541d21b3a31bd857f7993b to your computer and use it in GitHub Desktop.
...
min_lexicographic_char = chr(0)
max_lexicographic_char = chr(255)
PARTITIONS = (min_lexicographic_char, 'mod'), ('mod', max_lexicographic_char)
...
class TrieBuilder:
def build(self, target_id):
for start, end in PARTITIONS:
trie = self._create_trie(target_id, start, end)
trie_local_file_name = "trie.dat"
pickle.dump(trie, open(trie_local_file_name, "wb"))
trie_remote_hdfs_path = self._get_trie_remote_hdfs_path(target_id, start, end)
self._hdfsClient.upload_to_hdfs(trie_local_file_name, trie_remote_hdfs_path)
self._register_trie_zookeeper(target_id, trie_remote_hdfs_path, start, end)
self._register_next_target_zookeeper(target_id)
def _create_trie(self, target_id, start, end):
slidingwindows = self._hdfsClient.list("/phrases/2_targets/" + target_id)
...
for record in reader:
phrase = record['phrase']
self._logger.debug("Data is %s" % phrase)
if not start<=phrase<end:
continue
trie.add_phrase(phrase)
return trie
...
...
min_lexicographic_char = chr(0)
max_lexicographic_char = chr(255)
...
class Backend:
def __init__(self):
self._tries = [(Trie(), min_lexicographic_char, max_lexicographic_char)]
def _on_next_target_changed(self, data, stat, event=None):
next_target_id = data.decode()
partitions = self._zk.get_children(f'/phrases/distributor/{next_target_id}/partitions')
self._tries = []
for partition in partitions:
trie_data_hdfs_path = f'/phrases/distributor/{next_target_id}/partitions/{partition}/trie_data_hdfs_path'
trie = self._load_trie(self._zk.get(trie_data_hdfs_path)[0].decode())
start, end = partition.split('|')
if not start: start = min_lexicographic_char
if not end: end = max_lexicographic_char
self._tries.append((trie, start, end))
def top_phrases_for_prefix(self, prefix):
for trie, start, end in self._tries:
if start <= prefix < end:
return trie.top_phrases_for_prefix(prefix)
...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment