Skip to content

Instantly share code, notes, and snippets.

@Iftimie
Created September 13, 2020 19:52
Show Gist options
  • Save Iftimie/f1ef802ea518315f2e9a0be366c68009 to your computer and use it in GitHub Desktop.
Save Iftimie/f1ef802ea518315f2e9a0be366c68009 to your computer and use it in GitHub Desktop.
...
class Backend:
def __init__(self):
self._trie = Trie()
self.partition_index = self.find_self_partition()
def find_self_partition(self):
cli = docker.Client(base_url='unix://var/run/docker.sock')
all_containers = cli.containers()
# filter out ourself by HOSTNAME
our_container = [c for c in all_containers if c['Id'][:12] == HOSTNAME[:12]][0]
return int(our_container['Names'][0].split('_')[-1]) - 1
...
def _attempt_to_join_target(self, target_id):
if (not target_id or self._zk.exists(f'/phrases/distributor/{target_id}') is None):
return
partitions = self._zk.get_children(f'/phrases/distributor/{target_id}/partitions')
partition = partitions[self.partition_index]
node_path = f'/phrases/distributor/{target_id}/partitions/{partition}'
self._zk.set(node_path, socket.gethostname().encode())
trie_data_hdfs_path = f'/phrases/distributor/{target_id}/partitions/{partition}/trie_data_hdfs_path'
self._trie = self._load_trie(self._zk.get(trie_data_hdfs_path)[0].decode())
...
ZK_NEXT_TARGET = '/phrases/distributor/next_target'
class Frontend:
def __init__(self):
self._zk = KazooClient(hosts=f'{os.getenv("ZOOKEEPER_HOST")}:2181')
def top_phrases_for_prefix(self, prefix):
backend_hostname = self.backend_for_prefix(prefix)
r = requests.get(f'http://{backend_hostname}:6000/top-phrases', params = {'prefix': prefix})
top_phrases = r.json()["data"]["top_phrases"]
return top_phrases
def backend_for_prefix(self, prefix):
target_id = self._zk.get(ZK_NEXT_TARGET)[0].decode()
partitions = self._zk.get_children(f'/phrases/distributor/{target_id}/partitions')
for partition in partitions:
start, end = partition.split('|')
if ((not start or prefix >= start) and (not end or prefix < end)):
node_path = f'/phrases/distributor/{target_id}/partitions/{partition}'
hostname = self._zk.get(node_path)[0].decode()
return hostname
distributor.frontend:
build: ./distributor/frontend
depends_on:
- zookeeper
- assembler.hadoop.namenode
environment:
- ZOOKEEPER_HOST=zookeeper
ports:
- "7000"
volumes:
- ./distributor/frontend/main.py:/app/distributor/frontend/main.py
command: gunicorn --chdir /app/distributor/frontend main:app -b 0.0.0.0:7000 --reload
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment