Created
September 6, 2020 12:47
-
-
Save Iftimie/138994f04460ecdae23814562923ba76 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
... | |
from hdfs import InsecureClient | |
from hdfs.ext.avro import AvroReader | |
class HdfsClient: | |
def __init__(self, namenode_host, datanode_host): | |
self._namenode_host = namenode_host | |
self._datanode_host = datanode_host | |
self._client = InsecureClient(f'http://{self._namenode_host}:9870') | |
def list(self, path): | |
return self._client.list(path) | |
def upload_to_hdfs(self, local_path, remote_path): | |
with open(local_path, 'rb') as f: | |
r = requests.put(f'http://{self._namenode_host}:9870/webhdfs/v1{remote_path}?op=CREATE&overwrite=true', data=f) | |
class TrieBuilder: | |
def __init__(self): | |
self._zk = KazooClient(hosts=f'{os.getenv("ZOOKEEPER_HOST")}:2181') | |
self._hdfsClient = HdfsClient(os.getenv("HADOOP_NAMENODE_HOST"), os.getenv("HADOOP_DATANODE_HOST")) | |
... | |
def build(self, target_id): | |
... | |
trie_remote_hdfs_path = self._get_trie_remote_hdfs_path(target_id) | |
self._hdfsClient.upload_to_hdfs(trie_local_file_name, trie_remote_hdfs_path) | |
... | |
def _create_trie(self, target_id): | |
slidingwindows = self._hdfsClient.list("/phrases/2_targets/" + target_id) | |
trie = Trie() | |
for window in slidingwindows: | |
windowpath = "/phrases/2_targets/" + target_id+"/"+window | |
for avrofile in self._hdfsClient.list(windowpath): | |
avropath = os.path.join(windowpath, avrofile) | |
with AvroReader(self._hdfsClient._client, avropath) as reader: | |
for record in reader: | |
trie.add_phrase(record['phrase']) | |
return trie | |
... | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Backend: | |
def __init__(self): | |
... | |
def _load_trie(self, target_id): | |
local_path = 'trie.dat' | |
trie_hdfs_path = f'/phrases/3_tries/{target_id}/trie.dat' | |
self._hdfsClient.download(trie_hdfs_path, local_path) | |
self._trie = pickle.load( open(local_path, "rb")) | |
... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
MAX_NUMBER_OF_INPUT_FILES="3" | |
TARGET_ID=$(date +%Y%m%d_%H%M) | |
# Wait for phrases | |
echo "Checking if /phrases/1_sink/phrases/ exists.." | |
while ! $(hadoop fs -test -d "/phrases/1_sink/phrases/") ; do echo "Waiting for folder /phrases/1_sink/phrases/ to be created by kafka connect. Please wait.."; done | |
echo "Checking for contents in /phrases/1_sink/phrases/.." | |
while [[ $(hadoop fs -ls /phrases/1_sink/phrases/ | sed 1,1d) == "" ]] ; do echo "Waiting for kafka connect to populate /phrases/1_sink/phrases/*. Please wait.."; done | |
hadoop fs -mkdir -p /phrases/2_targets/${TARGET_ID}/ | |
INPUT_FOLDERS=`hadoop fs -ls /phrases/1_sink/phrases/ | sed 1,1d | sort -r -k8 | awk '{print \$8}' | head -${MAX_NUMBER_OF_INPUT_FOLDERS} | sort` | |
for input_folder in ${INPUT_FOLDERS} ; do | |
hdfs dfs -cp ${input_folder} /phrases/2_targets/${TARGET_ID}/ ; | |
done | |
cd /zookeeper/bin | |
./zkCli.sh -server zookeeper:2181 set /phrases/assembler/last_built_target ${TARGET_ID} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment