Skip to content

Instantly share code, notes, and snippets.

@emrahyildiz
Created March 5, 2018 13:40
Show Gist options
  • Save emrahyildiz/cf516f6df3891b5e04a2b383be89b52d to your computer and use it in GitHub Desktop.
Save emrahyildiz/cf516f6df3891b5e04a2b383be89b52d to your computer and use it in GitHub Desktop.
import subprocess
from snakebite.client import HAClient
from snakebite.namenode import Namenode
MODULE="HDFS"
class HdfsClient(Client):
def __init__(self):
namenodes = [Namenode(nn.decode(),9000) for nn in [hadoop-1,hadoop2,hadoop-n]]
self.baseClient = HAClient(namenodes , 9000)
self.pathToHadoop = "path-to-hadoop"
def merge(self, source, target, maxSize):
targetList = list(self.baseClient.ls([target]))
if(len(targetList) > 0):
lastPart = targetList[-1:][0]
targetPath = lastPart['path']
if(lastPart['length'] > maxSize):
partPath = target + '/part-' + str(len(targetList) + 1)
newPart = list(self.baseClient.touchz([partPath]))[0]
targetPath = newPart['path']
else:
targetPath = target + '/part-1'
dirList = list(self.baseClient.ls([source]))
for source in dirList:
fileList= list(self.baseClient.ls([source['path']]))
for fl in fileList:
command = "bash {0}/hdfs dfs -cat {1} | {0}/hdfs dfs -appendToFile - {2}".format(
self.pathToHadoop, self.getFullPath(fl['path']), self.getFullPath(targetPath))
subprocess.call(command, shell=True)
def delete(self,source):
list(self.baseClient.delete([source], recurse=True))
def getActiveNode(self):
return self.baseClient.df()['filesystem']
def getFullPath(self, fileName):
return self.getActiveNode() + fileName
def test():
HdfsClient().getActiveNode()
if __name__ == '__main__':
test()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment