Created
September 13, 2020 19:52
-
-
Save Iftimie/f1ef802ea518315f2e9a0be366c68009 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
... | |
class Backend: | |
def __init__(self): | |
self._trie = Trie() | |
self.partition_index = self.find_self_partition() | |
def find_self_partition(self): | |
cli = docker.Client(base_url='unix://var/run/docker.sock') | |
all_containers = cli.containers() | |
# filter out ourself by HOSTNAME | |
our_container = [c for c in all_containers if c['Id'][:12] == HOSTNAME[:12]][0] | |
return int(our_container['Names'][0].split('_')[-1]) - 1 | |
... | |
def _attempt_to_join_target(self, target_id): | |
if (not target_id or self._zk.exists(f'/phrases/distributor/{target_id}') is None): | |
return | |
partitions = self._zk.get_children(f'/phrases/distributor/{target_id}/partitions') | |
partition = partitions[self.partition_index] | |
node_path = f'/phrases/distributor/{target_id}/partitions/{partition}' | |
self._zk.set(node_path, socket.gethostname().encode()) | |
trie_data_hdfs_path = f'/phrases/distributor/{target_id}/partitions/{partition}/trie_data_hdfs_path' | |
self._trie = self._load_trie(self._zk.get(trie_data_hdfs_path)[0].decode()) | |
... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ZK_NEXT_TARGET = '/phrases/distributor/next_target' | |
class Frontend: | |
def __init__(self): | |
self._zk = KazooClient(hosts=f'{os.getenv("ZOOKEEPER_HOST")}:2181') | |
def top_phrases_for_prefix(self, prefix): | |
backend_hostname = self.backend_for_prefix(prefix) | |
r = requests.get(f'http://{backend_hostname}:6000/top-phrases', params = {'prefix': prefix}) | |
top_phrases = r.json()["data"]["top_phrases"] | |
return top_phrases | |
def backend_for_prefix(self, prefix): | |
target_id = self._zk.get(ZK_NEXT_TARGET)[0].decode() | |
partitions = self._zk.get_children(f'/phrases/distributor/{target_id}/partitions') | |
for partition in partitions: | |
start, end = partition.split('|') | |
if ((not start or prefix >= start) and (not end or prefix < end)): | |
node_path = f'/phrases/distributor/{target_id}/partitions/{partition}' | |
hostname = self._zk.get(node_path)[0].decode() | |
return hostname |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
distributor.frontend: | |
build: ./distributor/frontend | |
depends_on: | |
- zookeeper | |
- assembler.hadoop.namenode | |
environment: | |
- ZOOKEEPER_HOST=zookeeper | |
ports: | |
- "7000" | |
volumes: | |
- ./distributor/frontend/main.py:/app/distributor/frontend/main.py | |
command: gunicorn --chdir /app/distributor/frontend main:app -b 0.0.0.0:7000 --reload |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment