Skip to content

Instantly share code, notes, and snippets.

@Iftimie
Created September 6, 2020 12:47
Show Gist options
  • Save Iftimie/138994f04460ecdae23814562923ba76 to your computer and use it in GitHub Desktop.
Save Iftimie/138994f04460ecdae23814562923ba76 to your computer and use it in GitHub Desktop.
...
from hdfs import InsecureClient
from hdfs.ext.avro import AvroReader
class HdfsClient:
def __init__(self, namenode_host, datanode_host):
self._namenode_host = namenode_host
self._datanode_host = datanode_host
self._client = InsecureClient(f'http://{self._namenode_host}:9870')
def list(self, path):
return self._client.list(path)
def upload_to_hdfs(self, local_path, remote_path):
with open(local_path, 'rb') as f:
r = requests.put(f'http://{self._namenode_host}:9870/webhdfs/v1{remote_path}?op=CREATE&overwrite=true', data=f)
class TrieBuilder:
def __init__(self):
self._zk = KazooClient(hosts=f'{os.getenv("ZOOKEEPER_HOST")}:2181')
self._hdfsClient = HdfsClient(os.getenv("HADOOP_NAMENODE_HOST"), os.getenv("HADOOP_DATANODE_HOST"))
...
def build(self, target_id):
...
trie_remote_hdfs_path = self._get_trie_remote_hdfs_path(target_id)
self._hdfsClient.upload_to_hdfs(trie_local_file_name, trie_remote_hdfs_path)
...
def _create_trie(self, target_id):
slidingwindows = self._hdfsClient.list("/phrases/2_targets/" + target_id)
trie = Trie()
for window in slidingwindows:
windowpath = "/phrases/2_targets/" + target_id+"/"+window
for avrofile in self._hdfsClient.list(windowpath):
avropath = os.path.join(windowpath, avrofile)
with AvroReader(self._hdfsClient._client, avropath) as reader:
for record in reader:
trie.add_phrase(record['phrase'])
return trie
...
class Backend:
def __init__(self):
...
def _load_trie(self, target_id):
local_path = 'trie.dat'
trie_hdfs_path = f'/phrases/3_tries/{target_id}/trie.dat'
self._hdfsClient.download(trie_hdfs_path, local_path)
self._trie = pickle.load( open(local_path, "rb"))
...
#!/bin/bash
MAX_NUMBER_OF_INPUT_FILES="3"
TARGET_ID=$(date +%Y%m%d_%H%M)
# Wait for phrases
echo "Checking if /phrases/1_sink/phrases/ exists.."
while ! $(hadoop fs -test -d "/phrases/1_sink/phrases/") ; do echo "Waiting for folder /phrases/1_sink/phrases/ to be created by kafka connect. Please wait.."; done
echo "Checking for contents in /phrases/1_sink/phrases/.."
while [[ $(hadoop fs -ls /phrases/1_sink/phrases/ | sed 1,1d) == "" ]] ; do echo "Waiting for kafka connect to populate /phrases/1_sink/phrases/*. Please wait.."; done
hadoop fs -mkdir -p /phrases/2_targets/${TARGET_ID}/
INPUT_FOLDERS=`hadoop fs -ls /phrases/1_sink/phrases/ | sed 1,1d | sort -r -k8 | awk '{print \$8}' | head -${MAX_NUMBER_OF_INPUT_FOLDERS} | sort`
for input_folder in ${INPUT_FOLDERS} ; do
hdfs dfs -cp ${input_folder} /phrases/2_targets/${TARGET_ID}/ ;
done
cd /zookeeper/bin
./zkCli.sh -server zookeeper:2181 set /phrases/assembler/last_built_target ${TARGET_ID}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment