Created
September 13, 2020 19:23
-
-
Save Iftimie/a365ce393e541d21b3a31bd857f7993b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
... | |
min_lexicographic_char = chr(0) | |
max_lexicographic_char = chr(255) | |
PARTITIONS = (min_lexicographic_char, 'mod'), ('mod', max_lexicographic_char) | |
... | |
class TrieBuilder: | |
def build(self, target_id): | |
for start, end in PARTITIONS: | |
trie = self._create_trie(target_id, start, end) | |
trie_local_file_name = "trie.dat" | |
pickle.dump(trie, open(trie_local_file_name, "wb")) | |
trie_remote_hdfs_path = self._get_trie_remote_hdfs_path(target_id, start, end) | |
self._hdfsClient.upload_to_hdfs(trie_local_file_name, trie_remote_hdfs_path) | |
self._register_trie_zookeeper(target_id, trie_remote_hdfs_path, start, end) | |
self._register_next_target_zookeeper(target_id) | |
def _create_trie(self, target_id, start, end): | |
slidingwindows = self._hdfsClient.list("/phrases/2_targets/" + target_id) | |
... | |
for record in reader: | |
phrase = record['phrase'] | |
self._logger.debug("Data is %s" % phrase) | |
if not start<=phrase<end: | |
continue | |
trie.add_phrase(phrase) | |
return trie | |
... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
... | |
min_lexicographic_char = chr(0) | |
max_lexicographic_char = chr(255) | |
... | |
class Backend: | |
def __init__(self): | |
self._tries = [(Trie(), min_lexicographic_char, max_lexicographic_char)] | |
def _on_next_target_changed(self, data, stat, event=None): | |
next_target_id = data.decode() | |
partitions = self._zk.get_children(f'/phrases/distributor/{next_target_id}/partitions') | |
self._tries = [] | |
for partition in partitions: | |
trie_data_hdfs_path = f'/phrases/distributor/{next_target_id}/partitions/{partition}/trie_data_hdfs_path' | |
trie = self._load_trie(self._zk.get(trie_data_hdfs_path)[0].decode()) | |
start, end = partition.split('|') | |
if not start: start = min_lexicographic_char | |
if not end: end = max_lexicographic_char | |
self._tries.append((trie, start, end)) | |
def top_phrases_for_prefix(self, prefix): | |
for trie, start, end in self._tries: | |
if start <= prefix < end: | |
return trie.top_phrases_for_prefix(prefix) | |
... |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment