Skip to content

Instantly share code, notes, and snippets.

@Iftimie
Created September 6, 2020 12:27
Show Gist options
  • Save Iftimie/2daad26e604a67bad3ffed2ea45bf534 to your computer and use it in GitHub Desktop.
Save Iftimie/2daad26e604a67bad3ffed2ea45bf534 to your computer and use it in GitHub Desktop.
class Collector(object):
def collect_phrase(self, phrase):
shared_path = "/app/assembler/collector/shared_phrases"
current_time = int(time.time())
seconds_30 = 30
curfile = str(current_time-current_time%seconds_30)
fullpath = os.path.join(shared_path, curfile)
with open(fullpath, 'a') as f:
f.write(phrase+'\n')
from trie import Trie
import pickle
import os
import time
from kazoo.client import KazooClient, DataWatch
import shutil
SHARED_PATH = "/app/assembler/triebuilder/shared_targets/"
SHARED_TRIES = "/app/assembler/triebuilder/shared_tries/"
ZK_ASSEMBLER_LAST_BUILT_TARGET = '/phrases/assembler/last_built_target'
ZK_NEXT_TARGET = '/phrases/distributor/next_target'
class TrieBuilder:
def __init__(self):
self._zk = KazooClient(hosts=f'{os.getenv("ZOOKEEPER_HOST")}:2181')
def start(self):
self._zk.start()
datawatch_next_target = DataWatch(client=self._zk, path=ZK_ASSEMBLER_LAST_BUILT_TARGET, func=self._on_assembler_last_built_target_changed)
def _on_assembler_last_built_target_changed(self, data, stat, event=None):
if (data is None):
return
self.build(data.decode())
def build(self, target_id):
try:
trie = self._create_trie(target_id)
trie_local_file_name = "trie.dat"
pickle.dump(trie, open(trie_local_file_name, "wb"))
shared_path = os.path.join(SHARED_TRIES, f"trie_{target_id}.dat")
shutil.copyfile(trie_local_file_name, shared_path)
self._register_next_target_zookeeper(target_id)
except Exception as e:
self._logger.error("Error while building tree"+str(e))
def _create_trie(self, target_id):
trie = Trie()
target_dir = SHARED_PATH + target_id
for file in os.listdir(target_dir):
with open(os.path.join(target_dir, file), "r") as f:
for phrase in f.readlines():
trie.add_phrase(phrase)
return trie
def _register_next_target_zookeeper(self, target_id):
base_zk_path = ZK_NEXT_TARGET
self._zk.ensure_path(f'{base_zk_path}')
self._zk.set(f'{base_zk_path}', target_id.encode())
if __name__ == '__main__':
trie_builder = TrieBuilder()
trie_builder.start()
print("Trie builder started")
while True:
time.sleep(5)
import falcon
import json
import logging
import os
import pickle
from trie import Trie
from kazoo.client import KazooClient, DataWatch
ZK_NEXT_TARGET = '/phrases/distributor/next_target'
class Backend:
def __init__(self):
self._trie = Trie()
self._zk = KazooClient(hosts=f'{os.getenv("ZOOKEEPER_HOST")}:2181')
def start(self):
self._zk.start()
datawatch_next_target = DataWatch(client=self._zk, path=ZK_NEXT_TARGET, func=self._on_next_target_changed)
def _on_next_target_changed(self, data, stat, event=None):
if (data is None or data==b''):
return
next_target_id = data.decode()
self._load_trie(next_target_id)
def top_phrases_for_prefix(self, prefix):
return self._trie.top_phrases_for_prefix(prefix)
def _load_trie(self, target_id):
shared_path = "/app/distributor/backend/shared_tries"
trie_path = os.path.join(shared_path, f"trie_{target_id}.dat")
if os.path.exists(trie_path):
self._trie = pickle.load(open(trie_path,'rb'))
else:
self._logger.warning(f"File does not exist {trie_path}")
class TopPhrasesResource(object):
def __init__(self):
self._backend = Backend()
self._backend.start()
...
#!/bin/bash
MAX_NUMBER_OF_INPUT_FILES="3"
TARGET_ID=$(date +%Y%m%d_%H%M)
SOURCE=/app/assembler/tasks/shared_phrases/
INPUT_FILES=`ls -1 ${SOURCE} | sort -r | head -${MAX_NUMBER_OF_INPUT_FILES}`
DESTINATION=/app/assembler/tasks/shared_targets/${TARGET_ID}
[ ! -d ${DESTINATION} ] && (mkdir ${DESTINATION};)
for input_file in ${INPUT_FILES} ; do
cp ${SOURCE}${input_file} ${DESTINATION}/${input_file}
done
cd /zookeeper/bin
./zkCli.sh -server zookeeper:2181 set /phrases/assembler/last_built_target ${TARGET_ID}
zookeeper:
image: wurstmeister/zookeeper:latest
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
do_tasks:
sudo docker build -t lopespm/tasks ./assembler/tasks
sudo docker run \
--network ${DOCKER_NETWORK} \
-v "$$(pwd)/shared/shared_phrases":"/app/assembler/tasks/shared_phrases" \
-v "$$(pwd)/shared/shared_targets":"/app/assembler/tasks/shared_targets" \
lopespm/tasks
setup:
while [ "$$(echo "stat" | nc localhost 2181 | grep Mode)" != "Mode: standalone" ] ; do \
echo "Waiting for zookeeper to come online" ; \
sleep 2 ; \
done
sudo docker exec zookeeper ./bin/zkCli.sh -server localhost:2181 create /phrases ""
sudo docker exec zookeeper ./bin/zkCli.sh -server localhost:2181 create /phrases/assembler ""
sudo docker exec zookeeper ./bin/zkCli.sh -server localhost:2181 create /phrases/assembler/last_built_target ""
sudo docker exec zookeeper ./bin/zkCli.sh -server localhost:2181 create /phrases/distributor ""
sudo docker exec zookeeper ./bin/zkCli.sh -server localhost:2181 create /phrases/distributor/current_target ""
sudo docker exec zookeeper ./bin/zkCli.sh -server localhost:2181 create /phrases/distributor/next_target ""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment