Created
June 3, 2015 12:24
-
-
Save jean-helsinki/85b3ea97b7445abdd9a7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
Based on: | |
https://gist.github.com/meeuw/c3bc9dd07945c87c89e6#file-findfiles-py | |
https://bitbucket.org/nosklo/pysmbclient/wiki/Home | |
""" | |
import os | |
import pexpect | |
import re | |
import locale | |
import datetime | |
import time | |
import logging | |
__logger__ = logging.getLogger(os.path.basename(__file__)) | |
try: | |
datetime_strptime = datetime.datetime.strptime | |
except AttributeError: | |
# python version older than 2.5 | |
def datetime_strptime(date, format): | |
return datetime.datetime(*(time.strptime(date, format)[:6])) | |
_volume_re = re.compile(r""" | |
Volume:\s # label | |
\|([^|]*)\|\s # the volume name | |
serial\snumber\s # another label | |
0x([a-f0-9]+) # hex serial number | |
$ # end of line | |
""", re.VERBOSE) | |
_smb_header_re = re.compile(r""" | |
Domain=\[([^]]+)\]\s | |
OS=\[([^]]+)\]\s | |
Server=\[([^]]+)\] | |
$ | |
""", re.VERBOSE) | |
_file_re = re.compile(r""" | |
\s* # file lines start with some spaces | |
(.*?)\s+ # capture filename non-greedy, eating remaining spaces | |
([ADHSR]*) # capture file mode | |
\s+ # after the mode you can have any number of spaces | |
(\d+) # file size | |
\s+ # spaces after file size | |
( # begin date capturing | |
\w{3} # abbrev weekday | |
\s # space | |
\w{3} # abbrev month | |
\s{1,2} # one or two spaces before the day | |
\d{1,2} # day | |
\s # a space before the time | |
\d{2}:\d{2}:\d{2} # time | |
\s # space | |
\d{4} # year | |
) # end date capturing | |
$ # end of string""", re.VERBOSE) | |
_cwd_re = re.compile(ur"Current directory is \\\\(?P<host>[^\\]+)\\((?P<share>[^\\]+))(?P<rel_path>.+)") | |
_SMB_PROMPT = ur'smb:\s.*\>' | |
def _to_smb_path_fmt(path): | |
return path.replace(os.sep, '\\') | |
def _to_local_path_fmt(path): | |
return path.replace('\\', os.sep) | |
class SambaClientError(OSError): pass | |
class SambaClient(): | |
def __init__(self, host, share, username, domain=".", password=""): | |
smb_cmd = 'smbclient -W {domain} -U "{username}" //{host}/{share}/'.format(**locals()) | |
self.username = username | |
self.domain = domain | |
self.host = host | |
self.share = share | |
self.p = pexpect.spawn(smb_cmd, ) | |
self._put_pwd(password) | |
self._runcmd_error_on_data("prompt ON") | |
def _put_pwd(self, pwd): | |
self.p.expect(ur'(?i).*password:') | |
self.p.sendline(pwd) | |
self.p.expect(_SMB_PROMPT) | |
def _runcmd(self, command=None, *args): | |
if command: | |
f_args = " ".join('"%s"' % arg for arg in args) | |
full_cmd = command + " " + f_args | |
__logger__.debug("SMBCLIENT CMD: {0}".format(full_cmd)) | |
self.p.sendline(full_cmd) | |
self.p.expect(_SMB_PROMPT) | |
raw_out = self.p.before.decode('utf-8') | |
return raw_out.replace(full_cmd, "", 1).strip() | |
def _runcmd_error_on_data(self, cmd, *args): | |
"""raises SambaClientError if cmd returns any data""" | |
def term_bs_in_out(s): | |
return " \b" in s | |
data = self._runcmd(cmd, *args) | |
if data and not term_bs_in_out(data): | |
raise SambaClientError("Error on %r: %r" % (cmd, data)) | |
return data | |
def lsdir(self, path): | |
""" | |
Lists a directory | |
returns a list of tuples in the format: | |
[(filename, modes, size, date), ...] | |
""" | |
path = os.path.join(path, u'*') | |
return self.glob(path) | |
def glob(self, path): | |
""" | |
Lists a glob (example: "/files/somefile.*") | |
returns a list of tuples in the format: | |
[(filename, modes, size, date), ...] | |
""" | |
files = self._runcmd(u'ls', path).splitlines() | |
for filedata in files: | |
m = _file_re.match(filedata) | |
if m: | |
name, modes, size, date = m.groups() | |
if name == '.' or name == '..': | |
continue | |
size = int(size) | |
# Resets locale to "C" to parse english date properly | |
# (non thread-safe code) | |
loc = locale.getlocale(locale.LC_TIME) | |
locale.setlocale(locale.LC_TIME, 'C') | |
date = datetime_strptime(date, '%a %b %d %H:%M:%S %Y') | |
locale.setlocale(locale.LC_TIME, loc) | |
yield (name, modes, size, date) | |
def listdir(self, path): | |
"""Emulates os.listdir()""" | |
result = [f[0] for f in self.lsdir(path)] | |
if not result: # can mean both that the dir is empty or not found | |
# disambiguation: verifies if the path doesn't exist. Let the error | |
# raised by _getfile propagate in that case. | |
self._getfile(path) | |
return result | |
def _getfile(self, path): | |
try: | |
f = self.glob(path).next() | |
except StopIteration: | |
raise SambaClientError('Path not found: %r' % path) | |
return f | |
def info(self, path): | |
"""Fetches information about a file""" | |
path = _to_smb_path_fmt(path) | |
data = self._runcmd(u'allinfo', path) | |
if data.startswith('ERRSRV'): | |
raise SambaClientError( | |
'Error retrieving info for %r: %r' % (path, data.strip())) | |
result = {} | |
for info in data.splitlines(): | |
k, sep, v = info.partition(':') | |
if sep: | |
result[k.strip()] = v.strip() | |
return result | |
def diskinfo(self): | |
"""Fetches information about a volume""" | |
data = self._runcmd('volume') | |
for line in data.splitlines(): | |
m = _volume_re.match(line) | |
if m: | |
name, serial = m.groups() | |
return name, int(serial, 16) | |
else: | |
raise SambaClientError( | |
'Error retrieving disk information: %r' % data) | |
def volume(self): | |
"""Fetches the volume name""" | |
return self.diskinfo()[0] | |
def serial(self): | |
"""Fetches the volume serial""" | |
return self.diskinfo()[1] | |
def isdir(self, path): | |
"""Returns True if path is a directory/folder""" | |
return 'D' in self._getfile(path)[1] | |
def isfile(self, path): | |
"""Returns True if path is a regular file""" | |
return not self.isdir(path) | |
def exists(self, path): | |
"""Returns True if path exists in the remote host""" | |
try: | |
self._getfile(path) | |
except SambaClientError: | |
return False | |
else: | |
return True | |
def mkdir(self, path): | |
"""Creates a new folder remotely""" | |
path = _to_smb_path_fmt(path) | |
self._runcmd_error_on_data(u'mkdir', path) | |
def cd(self, path): | |
"""Change remote dir""" | |
path = _to_smb_path_fmt(path) | |
self._runcmd_error_on_data(u'cd', path) | |
def lcd(self, path): | |
"""Change local dir""" | |
self._runcmd_error_on_data(u'lcd', path) | |
def rmdir(self, path): | |
"""Removes a remote empty folder""" | |
path = _to_smb_path_fmt(path) | |
self._runcmd_error_on_data(u'rmdir', path) | |
def unlink(self, path): | |
"""Removes/deletes/unlinks a file or folder""" | |
path = _to_smb_path_fmt(path) | |
if self.isdir(path): | |
try: | |
self.set_recurse() | |
self._runcmd_error_on_data(u'del', path + "\\*") | |
self.rmdir(path) | |
finally: | |
self.set_recurse(False) | |
else: | |
self._runcmd_error_on_data(u'del', path) | |
remove = unlink | |
def chmod(self, path, *modes): | |
"""Set/reset file modes | |
Tested with: AHS | |
smbc.chmod('/file.txt', '+H') | |
""" | |
path = _to_smb_path_fmt(path) | |
plus_modes = [] | |
minus_modes = [] | |
for mode in modes: | |
if mode.startswith(u'-'): | |
minus_modes.append(mode.lstrip(u'-')) | |
else: | |
plus_modes.append(mode.lstrip(u'+')) | |
modes = [] | |
if plus_modes: | |
modes.append(u'+%s' % u''.join(plus_modes)) | |
if minus_modes: | |
modes.append(u'-%s' % u''.join(minus_modes)) | |
self._runcmd_error_on_data(u'setmode', u''.join(modes)) | |
def rename(self, old_name, new_name): | |
old_name = _to_smb_path_fmt(old_name) | |
new_name = _to_smb_path_fmt(new_name) | |
self._runcmd_error_on_data(u'rename', old_name, new_name) | |
def download_file(self, remote_path, local_path): | |
remote_path = _to_smb_path_fmt(remote_path) | |
result = self._runcmd(u'get', remote_path, local_path) | |
def upload_file(self, local_path, remote_path): | |
remote_path = _to_smb_path_fmt(remote_path) | |
result = self._runcmd(u'put', local_path, remote_path) | |
def upload_update(self, local_path, remote_path): | |
remote_path = _to_smb_path_fmt(remote_path) | |
result = self._runcmd(u'reput', local_path, remote_path) | |
def walk(self, top, topdown=True): | |
names = self.glob(os.path.join(top, "*")) | |
dirs, nondirs = [], [] | |
for item in names: | |
if 'D' in item[1]: | |
dirs.append(item[0]) | |
else: | |
nondirs.append(item[0]) | |
if topdown: | |
yield top, dirs, nondirs | |
for name in dirs: | |
new_path = os.path.join(top, name) | |
for x in self.walk(new_path, topdown): | |
yield x | |
if not topdown: | |
yield top, dirs, nondirs | |
def upload(self, local_path, remote_path): | |
if os.path.isdir(local_path): | |
if not self.exists(remote_path): | |
self.makedirs(remote_path) | |
for root, dirs, files in os.walk(local_path): | |
remote_root = root.replace(local_path, _to_local_path_fmt(remote_path)) | |
for d in dirs: | |
rem_dir = _to_smb_path_fmt(os.path.join(remote_root, d)) | |
if not self.exists(rem_dir): | |
self.makedirs(rem_dir) | |
for f in files: | |
rem_f = _to_smb_path_fmt(os.path.join(remote_root, f)) | |
self.upload_file(os.path.join(root, f), rem_f) | |
else: | |
basedir = os.path.dirname(remote_path) | |
if not self.exists(basedir): | |
self.makedirs(basedir) | |
self.upload_file(local_path, remote_path) | |
def download(self, remote_path, local_path): | |
if self.isdir(remote_path): | |
if not os.path.exists(local_path): | |
os.makedirs(local_path) | |
for root, dirs, files in self.walk(remote_path): | |
local_root = root.replace(_to_local_path_fmt(remote_path), local_path) | |
for d in dirs: | |
loc_dir = os.path.join(local_root, d) | |
if not os.path.exists(loc_dir): | |
os.makedirs(loc_dir) | |
for f in files: | |
loc_f = os.path.join(local_root, f) | |
self.download_file(_to_smb_path_fmt(os.path.join(root, f)), loc_f) | |
else: | |
basedir = os.path.dirname(local_path) | |
if not os.path.exists(basedir): | |
self.makedirs(basedir) | |
self.download_file(remote_path, local_path) | |
def get_cwd(self): | |
out = self._runcmd(u'pwd').strip() | |
m = _cwd_re.search(out) | |
if m: | |
return m.groupdict()["rel_path"] | |
else: | |
raise SambaClientError("pwd command fail") | |
def makedirs(self, remote_path): | |
remote_path = _to_smb_path_fmt(remote_path) | |
dir_items = remote_path.split("\\") | |
for i in xrange(len(dir_items)): | |
part_path = "\\".join(dir_items[:i+1]) | |
if not self.exists(part_path): | |
self.mkdir(part_path) | |
def set_recurse(self, recurse=True): | |
cwd = self.get_cwd() | |
try: | |
if cwd != "\\": self.cd("\\") | |
flag = "ON" if recurse else "OFF" | |
self._runcmd_error_on_data("recurse {0}".format(flag)) | |
finally: | |
if cwd != "\\": self.cd(cwd) | |
def __enter__(self): | |
return self | |
def __exit__(self, exc_type, exc_value, exc_traceback): | |
self.close() | |
def __repr__(self): | |
return '<SambaClient({self.domain}\\{self.username}@//{self.host}/{self.share})>'.format(self=self) | |
def close(self): | |
pass | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
nice! thanks for sharing!