Skip to content

Instantly share code, notes, and snippets.

View Iftimie's full-sized avatar
🐒

Iftimie Florentin Alexandru Iftimie

🐒
  • Bucharest, Romania
View GitHub Profile
@Iftimie
Iftimie / main.py
Created September 6, 2020 11:38
Simplified main.py
import falcon
import json
import logging
import os
import pickle
from trie import Trie
# from https://github.com/Iftimie/autocomplete/blob/7758e6d93cc9980749dde33720ce9ee8cda8c41b/main.py
class Collector(object):
@Iftimie
Iftimie / assembler_collector.py
Created September 6, 2020 11:52
Split the monolith into separate services
from trie import Trie
import pickle
import logging
import falcon
import json
class Collector(object):
def collect_phrase(self, phrase):
@Iftimie
Iftimie / assembler_collector.py
Created September 6, 2020 12:02
Add signaling between collector and triebuilder
class Collector(object):
def collect_phrase(self, phrase):
shared_path = "/app/assembler/collector/shared_phrases"
sorted_files = sorted(os.listdir(shared_path))
current_time = time.time()
seconds_30 = 30
if not sorted_files:
curfile = str(int(current_time))
elif int(sorted_files[-1])+seconds_30 < current_time:
def on_post(self, req, resp):
phrase_file = req.params['phrase_file']
...
trie_local_file_name = f"/app/assembler/triebuilder/shared_data/trie_{phrase_file}.dat"
pickle.dump(trie, open(trie_local_file_name, "wb"))
...
...
trie_local_file_name = f"/app/assembler/triebuilder/shared_data/trie_{phrase_file}.dat"
pickle.dump(trie, open(trie_local_file_name, "wb"))
requests.post(f"http://distributor.backend:6000/reload-trie?trie_file=trie_{phrase_file}.dat")
...
do_tasks:
sudo docker build -t lopespm/tasks ./assembler/tasks
sudo docker run \
--network ${DOCKER_NETWORK} \
-v "$$(pwd)/shared/shared_phrases":"/app/assembler/tasks/shared_phrases" \
-v "$$(pwd)/shared/shared_targets":"/app/assembler/tasks/shared_targets" \
lopespm/tasks
setup:
@Iftimie
Iftimie / consumer.sh
Created September 6, 2020 12:35
Collector -> message broker -> kafka-connect -> HDFS
#!/bin/sh
curl -s \
-X "POST" "http://localhost:8083/connectors/" \
-H "Content-Type: application/json" \
--data '{
"name": "hdfs-sink-phrases",
"config": {
"connector.class": "io.confluent.connect.hdfs3.Hdfs3SinkConnector",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
...
from hdfs import InsecureClient
from hdfs.ext.avro import AvroReader
class HdfsClient:
def __init__(self, namenode_host, datanode_host):
self._namenode_host = namenode_host
self._datanode_host = datanode_host
self._client = InsecureClient(f'http://{self._namenode_host}:9870')
...
min_lexicographic_char = chr(0)
max_lexicographic_char = chr(255)
PARTITIONS = (min_lexicographic_char, 'mod'), ('mod', max_lexicographic_char)
...
class TrieBuilder:
def build(self, target_id):
for start, end in PARTITIONS:
...
class Backend:
def __init__(self):
self._trie = Trie()
self.partition_index = self.find_self_partition()
def find_self_partition(self):
cli = docker.Client(base_url='unix://var/run/docker.sock')
all_containers = cli.containers()
# filter out ourself by HOSTNAME