Skip to content

Instantly share code, notes, and snippets.

@xiaoshuai
Created November 16, 2018 16:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save xiaoshuai/da3f0195177abcad95a9fe86f4a19f28 to your computer and use it in GitHub Desktop.
Save xiaoshuai/da3f0195177abcad95a9fe86f4a19f28 to your computer and use it in GitHub Desktop.
hdfs.py : call hdfs command via python3
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""hdfs.py: call hdfs command via python3"""
__author__ = "xiaoshuai, http://github.com/xiaoshuai"
__license__ = "GPL"
__version__ = "1.0.1"
import os
import re
import subprocess
import sys
from subprocess import check_output, CalledProcessError
PY3 = sys.version_info[0] == 3
if not PY3:
print('Only Python3.6 above supported.', file=sys.stderr)
sys.exit(1)
class HdfsCommandError(RuntimeError):
def __init__(self, message):
super(HdfsCommandError, self).__init__(message)
self.message = message
print(self.message, file=sys.stderr)
class HdfsClient:
def __init__(self, hadoop_cmd='hdfs dfs', debug=False):
self.hadoop_cmd = hadoop_cmd.split(' ')
self.debug = debug
@staticmethod
def __local_path_exists(local_path):
import os.path
isfile = os.path.isfile(local_path)
isdir = os.path.isdir(local_path)
return isfile or isdir
def __call_check(self, cmd):
if self.debug:
debug_message = 'hdfs command "{0}" will call.'.format(subprocess.list2cmdline(cmd))
print(debug_message, file=sys.stderr)
try:
stdout = check_output(cmd)
except CalledProcessError as err:
message = 'hdfs command "{0}" failed with error code {1}.'.format(' '.join(err.cmd), err.returncode)
raise HdfsCommandError(message) from err
stdout = stdout.decode('utf-8')
return stdout
def exists(self, path):
cmd = self.hadoop_cmd + ['-stat', path]
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
close_fds=True, universal_newlines=True)
stdout, stderr = p.communicate()
if p.returncode == 0:
return True
else:
not_found_pattern = "^.*No such file or directory$"
not_found_re = re.compile(not_found_pattern)
for line in stderr.split('\n'):
if not_found_re.match(line):
return False
message = 'run_cmd="{0}" ret_code={1} error.'.format(subprocess.list2cmdline(cmd), p.returncode)
raise HdfsCommandError(message)
def move(self, path, dest):
if not dest:
return
if dest.startswith('/'):
parent_dir = os.path.dirname(dest)
if parent_dir != '' and not self.exists(parent_dir):
self.mkdir(parent_dir)
if not isinstance(path, (list, tuple)):
path = [path]
cmd = self.hadoop_cmd + ['-mv'] + path + [dest]
return self.__call_check(cmd)
def rename(self, path, dest):
return self.move(path, dest)
def rename_dont_move(self, path, dest):
if self.exists(dest):
raise HdfsCommandError('destination file already exist.')
return self.move(path, dest)
def remove(self, path, recursive=True, skip_trash=False):
pass
def chmod(self, path, permissions, recursive=False):
pass
def chown(self, path, owner, group, recursive=False):
pass
def count(self, path):
pass
def copy(self, path, destination):
if not self.exists(path):
raise HdfsCommandError('source file does not exist.')
cmd = self.hadoop_cmd + ['-cp', path, destination]
return self.__call_check(cmd)
def put(self, local_path, destination):
if not self.__local_path_exists(local_path):
raise HdfsCommandError('source file does not exist.')
cmd = self.hadoop_cmd + ['-put', local_path, destination]
return self.__call_check(cmd)
def get(self, path, local_destination):
if self.__local_path_exists(local_destination):
raise HdfsCommandError('destination file already exist.')
cmd = self.hadoop_cmd + ['-get', path, local_destination]
return self.__call_check(cmd)
def cat(self, path):
cmd = self.hadoop_cmd + ['-cat', path]
return self.__call_check(cmd)
def mkdir(self, path, parents=True, raise_if_exists=False):
pass
def listdir(self, path, ignore_directories=False, ignore_files=False,
include_size=False, include_type=False, include_time=False, recursive=False):
pass
def touchz(self, path):
pass
if __name__ == '__main__':
# /opt/python36/bin/python3 test_hdfs.py
hdfs = HdfsClient(debug=True)
return_msg = hdfs.cat('hello.txt')
print(return_msg)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment