Created
September 6, 2020 12:27
-
-
Save Iftimie/2daad26e604a67bad3ffed2ea45bf534 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Collector(object): | |
def collect_phrase(self, phrase): | |
shared_path = "/app/assembler/collector/shared_phrases" | |
current_time = int(time.time()) | |
seconds_30 = 30 | |
curfile = str(current_time-current_time%seconds_30) | |
fullpath = os.path.join(shared_path, curfile) | |
with open(fullpath, 'a') as f: | |
f.write(phrase+'\n') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from trie import Trie | |
import pickle | |
import os | |
import time | |
from kazoo.client import KazooClient, DataWatch | |
import shutil | |
SHARED_PATH = "/app/assembler/triebuilder/shared_targets/" | |
SHARED_TRIES = "/app/assembler/triebuilder/shared_tries/" | |
ZK_ASSEMBLER_LAST_BUILT_TARGET = '/phrases/assembler/last_built_target' | |
ZK_NEXT_TARGET = '/phrases/distributor/next_target' | |
class TrieBuilder: | |
def __init__(self): | |
self._zk = KazooClient(hosts=f'{os.getenv("ZOOKEEPER_HOST")}:2181') | |
def start(self): | |
self._zk.start() | |
datawatch_next_target = DataWatch(client=self._zk, path=ZK_ASSEMBLER_LAST_BUILT_TARGET, func=self._on_assembler_last_built_target_changed) | |
def _on_assembler_last_built_target_changed(self, data, stat, event=None): | |
if (data is None): | |
return | |
self.build(data.decode()) | |
def build(self, target_id): | |
try: | |
trie = self._create_trie(target_id) | |
trie_local_file_name = "trie.dat" | |
pickle.dump(trie, open(trie_local_file_name, "wb")) | |
shared_path = os.path.join(SHARED_TRIES, f"trie_{target_id}.dat") | |
shutil.copyfile(trie_local_file_name, shared_path) | |
self._register_next_target_zookeeper(target_id) | |
except Exception as e: | |
self._logger.error("Error while building tree"+str(e)) | |
def _create_trie(self, target_id): | |
trie = Trie() | |
target_dir = SHARED_PATH + target_id | |
for file in os.listdir(target_dir): | |
with open(os.path.join(target_dir, file), "r") as f: | |
for phrase in f.readlines(): | |
trie.add_phrase(phrase) | |
return trie | |
def _register_next_target_zookeeper(self, target_id): | |
base_zk_path = ZK_NEXT_TARGET | |
self._zk.ensure_path(f'{base_zk_path}') | |
self._zk.set(f'{base_zk_path}', target_id.encode()) | |
if __name__ == '__main__': | |
trie_builder = TrieBuilder() | |
trie_builder.start() | |
print("Trie builder started") | |
while True: | |
time.sleep(5) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import falcon | |
import json | |
import logging | |
import os | |
import pickle | |
from trie import Trie | |
from kazoo.client import KazooClient, DataWatch | |
ZK_NEXT_TARGET = '/phrases/distributor/next_target' | |
class Backend: | |
def __init__(self): | |
self._trie = Trie() | |
self._zk = KazooClient(hosts=f'{os.getenv("ZOOKEEPER_HOST")}:2181') | |
def start(self): | |
self._zk.start() | |
datawatch_next_target = DataWatch(client=self._zk, path=ZK_NEXT_TARGET, func=self._on_next_target_changed) | |
def _on_next_target_changed(self, data, stat, event=None): | |
if (data is None or data==b''): | |
return | |
next_target_id = data.decode() | |
self._load_trie(next_target_id) | |
def top_phrases_for_prefix(self, prefix): | |
return self._trie.top_phrases_for_prefix(prefix) | |
def _load_trie(self, target_id): | |
shared_path = "/app/distributor/backend/shared_tries" | |
trie_path = os.path.join(shared_path, f"trie_{target_id}.dat") | |
if os.path.exists(trie_path): | |
self._trie = pickle.load(open(trie_path,'rb')) | |
else: | |
self._logger.warning(f"File does not exist {trie_path}") | |
class TopPhrasesResource(object): | |
def __init__(self): | |
self._backend = Backend() | |
self._backend.start() | |
... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
MAX_NUMBER_OF_INPUT_FILES="3" | |
TARGET_ID=$(date +%Y%m%d_%H%M) | |
SOURCE=/app/assembler/tasks/shared_phrases/ | |
INPUT_FILES=`ls -1 ${SOURCE} | sort -r | head -${MAX_NUMBER_OF_INPUT_FILES}` | |
DESTINATION=/app/assembler/tasks/shared_targets/${TARGET_ID} | |
[ ! -d ${DESTINATION} ] && (mkdir ${DESTINATION};) | |
for input_file in ${INPUT_FILES} ; do | |
cp ${SOURCE}${input_file} ${DESTINATION}/${input_file} | |
done | |
cd /zookeeper/bin | |
./zkCli.sh -server zookeeper:2181 set /phrases/assembler/last_built_target ${TARGET_ID} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
zookeeper: | |
image: wurstmeister/zookeeper:latest | |
hostname: zookeeper | |
container_name: zookeeper | |
ports: | |
- "2181:2181" | |
environment: | |
ZOOKEEPER_CLIENT_PORT: 2181 | |
ZOOKEEPER_TICK_TIME: 2000 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
do_tasks: | |
sudo docker build -t lopespm/tasks ./assembler/tasks | |
sudo docker run \ | |
--network ${DOCKER_NETWORK} \ | |
-v "$$(pwd)/shared/shared_phrases":"/app/assembler/tasks/shared_phrases" \ | |
-v "$$(pwd)/shared/shared_targets":"/app/assembler/tasks/shared_targets" \ | |
lopespm/tasks | |
setup: | |
while [ "$$(echo "stat" | nc localhost 2181 | grep Mode)" != "Mode: standalone" ] ; do \ | |
echo "Waiting for zookeeper to come online" ; \ | |
sleep 2 ; \ | |
done | |
sudo docker exec zookeeper ./bin/zkCli.sh -server localhost:2181 create /phrases "" | |
sudo docker exec zookeeper ./bin/zkCli.sh -server localhost:2181 create /phrases/assembler "" | |
sudo docker exec zookeeper ./bin/zkCli.sh -server localhost:2181 create /phrases/assembler/last_built_target "" | |
sudo docker exec zookeeper ./bin/zkCli.sh -server localhost:2181 create /phrases/distributor "" | |
sudo docker exec zookeeper ./bin/zkCli.sh -server localhost:2181 create /phrases/distributor/current_target "" | |
sudo docker exec zookeeper ./bin/zkCli.sh -server localhost:2181 create /phrases/distributor/next_target "" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment