Skip to content

Instantly share code, notes, and snippets.

@Iftimie
Created September 13, 2020 20:15
Show Gist options
  • Save Iftimie/ed4297059775207cd51d72900d8ed5f4 to your computer and use it in GitHub Desktop.
Save Iftimie/ed4297059775207cd51d72900d8ed5f4 to your computer and use it in GitHub Desktop.
min_lexicographic_char = chr(0)
max_lexicographic_char = chr(255)
PARTITIONS = (min_lexicographic_char, 'mod'), ('mod', max_lexicographic_char)
class Backend:
def __init__(self):
self.backend_number = self.find_self_partition()
def _attempt_to_join_target(self, target_id):
if (not target_id or self._zk.exists(f'/phrases/distributor/{target_id}') is None):
return
partitions = self._zk.get_children(f'/phrases/distributor/{target_id}/partitions')
partition = partitions[self.backend_number % len(PARTITIONS)]
nodes_path = f'/phrases/distributor/{target_id}/partitions/{partition}/nodes/'
self._zk.ensure_path(nodes_path)
created_node_path = self._zk.create(nodes_path, value=b'', ephemeral=True, sequence=True)
self._zk.set(created_node_path, socket.gethostname().encode())
trie_data_hdfs_path = f'/phrases/distributor/{target_id}/partitions/{partition}/trie_data_hdfs_path'
self._trie = self._load_trie(self._zk.get(trie_data_hdfs_path)[0].decode())
class Frontend:
def top_phrases_for_prefix(self, prefix):
backend_hostname = self.backend_for_prefix(prefix)
r = requests.get(f'http://{backend_hostname}:6000/top-phrases', params = {'prefix': prefix})
top_phrases = r.json()["data"]["top_phrases"]
return top_phrases
def backend_for_prefix(self, prefix):
partitions = self._zk.get_children(f'/phrases/distributor/{target_id}/partitions')
for partition in partitions:
start, end = partition.split('|')
if ((not start or prefix >= start) and (not end or prefix < end)):
nodes_path = f'/phrases/distributor/{target_id}/partitions/{partition}/nodes/'
nodes = self._zk.get_children(nodes_path)
random.shuffle(nodes)
while nodes:
node = nodes.pop()
hostname = self._zk.get(f'{nodes_path}/{node}')[0].decode()
if not socket.gethostbyname(hostname):
self._zk.delete(f'{nodes_path}/{node}')
else:
return hostname
return None
events {
}
http {
upstream backend {
least_conn;
server distributor.frontend:7000;
}
server {
listen 8000 default;
client_max_body_size 108M;
access_log /dev/stdout;
location /top-phrases {
proxy_pass http://backend;
}
}
}
distributor.load-balancer:
container_name: distributor.load-balancer
image: nginx:1.19-alpine
ports:
- "8000:8000"
depends_on:
- distributor.frontend
volumes:
- ./distributor/load-balancer/nginx.conf:/etc/nginx/nginx.conf
- ./distributor/load-balancer/log:/var/log/nginx
location /top-phrases {
proxy_pass http://distributor.load-balancer:8000;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment