-
-
Save idlesign/6000f730a7fa2421b34a5287d6011a8b to your computer and use it in GitHub Desktop.
smbclient Python Wrapper using pexpect
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
https://gist.github.com/idlesign/6000f730a7fa2421b34a5287d6011a8b | |
Based on | |
https://gist.github.com/jean-helsinki/85b3ea97b7445abdd9a7 | |
""" | |
from __future__ import unicode_literals | |
import locale | |
import logging | |
import os | |
import re | |
import sys | |
from datetime import datetime | |
import pexpect | |
from six.moves import range | |
LOGGER = logging.getLogger(__name__) | |
RE_VOLUME = re.compile(r""" | |
Volume:\s # label | |
\|([^|]*)\|\s # the volume name | |
serial\snumber\s # another label | |
0x([a-f0-9]+) # hex serial number | |
$ # end of line | |
""", re.VERBOSE) | |
RE_HEADER = re.compile(r""" | |
Domain=\[([^]]+)\]\s | |
OS=\[([^]]+)\]\s | |
Server=\[([^]]+)\] | |
$ | |
""", re.VERBOSE) | |
RE_LS_LINE = re.compile(r""" | |
\s* # file lines start with some spaces | |
(.*?)\s+ # capture filename non-greedy, eating remaining spaces | |
([ADHSR]*) # capture file mode | |
\s+ # after the mode you can have any number of spaces | |
(\d+) # file size | |
\s+ # spaces after file size | |
( # begin date capturing | |
\w{3} # abbrev weekday | |
\s # space | |
\w{3} # abbrev month | |
\s{1,2} # one or two spaces before the day | |
\d{1,2} # day | |
\s # a space before the time | |
\d{2}:\d{2}:\d{2} # time | |
\s # space | |
\d{4} # year | |
) # end date capturing | |
$ # end of string""", re.VERBOSE) | |
RE_CWD = re.compile(r'Current directory is \\\\(?P<host>[^\\]+)\\((?P<share>[^\\]+))(?P<rel_path>.+)') | |
RE_PROMPT = r'smb:\s.*\>' | |
def _to_smb_path_fmt(path): | |
return path.replace(os.sep, '\\') | |
def _to_local_path_fmt(path): | |
return path.replace('\\', os.sep) | |
class SambaClientError(Exception): | |
pass | |
class SambaClient(object): | |
def __init__(self, host, share, username, domain='.', password='', mode=None, debug=False): | |
smb_cmd = 'smbclient {mode} -W {domain} -U "{username}" //{host}/{share}/'.format( | |
host=host, | |
share=share, | |
domain=domain, | |
username=username, | |
mode='-m %s' % mode if mode else '', | |
debug='' if isinstance(debug, bool) else '-d %s' % debug # Support integer to set debug level. | |
) | |
self.username = username | |
self.domain = domain | |
self.host = host | |
self.share = share | |
prc = pexpect.spawn(smb_cmd, echo=False, encoding='utf-8') | |
if debug: | |
# Отправка журнала pexpect прямо на вывод для отладки. | |
prc.logfile = sys.stdout | |
self.prc = prc | |
self._supply_password(password) | |
self._runcmd_error_on_data('prompt ON') | |
def _supply_password(self, pwd): | |
prc = self.prc | |
prc.expect(r'(?i).*password:') | |
prc.sendline(pwd) | |
prc.expect(RE_PROMPT) | |
def _runcmd(self, command=None, *args): | |
prc = self.prc | |
cmd = '' | |
if command: | |
cmd = '%s %s' % (command, ' '.join('"%s"' % arg for arg in args)) | |
LOGGER.debug('SMB command: %s', cmd) | |
prc.send('%s%s' % (cmd, prc.linesep)) | |
prc.expect(RE_PROMPT) | |
return prc.before.replace(cmd, '', 1).strip() | |
def _runcmd_error_on_data(self, cmd, *args): | |
"""Runs a command and raises SambaClientError if any data is returned. | |
:param cmd: | |
:param args: | |
:raise: SambaClientError | |
""" | |
data = self._runcmd(cmd, *args) | |
if data and ' \b' not in data: | |
raise SambaClientError('Unexpected result for command %s: %s' % (cmd, data)) | |
return data | |
def lsdir(self, path): | |
""" | |
Lists a directory | |
returns a list of tuples in the format: | |
[(filename, modes, size, date), ...] | |
""" | |
path = os.path.join(path, '*') | |
return self.glob(path) | |
def glob(self, path): | |
""" | |
Lists a glob (example: "/files/somefile.*") | |
returns a list of tuples in the format: | |
[(filename, modes, size, date), ...] | |
""" | |
files = self._runcmd('ls', path).splitlines() | |
for filedata in files: | |
match = RE_LS_LINE.match(filedata) | |
if not match: | |
continue | |
name, modes, size, date = match.groups() | |
if name == '.' or name == '..': | |
continue | |
size = int(size) | |
# Resets locale to "C" to parse english date properly | |
# (non thread-safe code) | |
loc = locale.getlocale(locale.LC_TIME) | |
locale.setlocale(locale.LC_TIME, str('C')) | |
date = datetime.strptime(date, '%a %b %d %H:%M:%S %Y') | |
locale.setlocale(locale.LC_TIME, loc) | |
yield (name, modes, size, date) | |
def listdir(self, path): | |
"""Emulates os.listdir()""" | |
result = [f[0] for f in self.lsdir(path)] | |
if not result: # can mean both that the dir is empty or not found | |
# disambiguation: verifies if the path doesn't exist. Let the error | |
# raised by _get_file_info propagate in that case. | |
self._get_file_info(path) | |
return result | |
def _get_file_info(self, path): | |
try: | |
fileinfo = self.glob(path).next() | |
except StopIteration: | |
raise SambaClientError('Path not found: %s' % path) | |
return fileinfo | |
def info(self, path): | |
"""Fetches information about a file. | |
:param path: | |
:rtype: dict | |
""" | |
path = _to_smb_path_fmt(path) | |
data = self._runcmd('allinfo', path) | |
if data.startswith('ERRSRV'): | |
raise SambaClientError('Error retrieving info for %s: %s' % (path, data.strip())) | |
result = {} | |
for info in data.splitlines(): | |
k, sep, v = info.partition(':') | |
if sep: | |
result[k.strip()] = v.strip() | |
return result | |
def diskinfo(self): | |
"""Fetches information about a volume""" | |
data = self._runcmd('volume') | |
for line in data.splitlines(): | |
match = RE_VOLUME.match(line) | |
if match: | |
name, serial = match.groups() | |
return name, int(serial, 16) | |
else: | |
raise SambaClientError('Error while retrieving disk info: %s' % data) | |
def volume(self): | |
"""Fetches the volume name""" | |
return self.diskinfo()[0] | |
def serial(self): | |
"""Fetches the volume serial""" | |
return self.diskinfo()[1] | |
def isdir(self, path): | |
"""Returns True if path is a directory/folder""" | |
return 'D' in self._get_file_info(path)[1] | |
def isfile(self, path): | |
"""Returns True if path is a regular file""" | |
return not self.isdir(path) | |
def exists(self, path): | |
"""Returns True if path exists in the remote host""" | |
try: | |
self._get_file_info(path) | |
except SambaClientError: | |
return False | |
else: | |
return True | |
def mkdir(self, path): | |
"""Creates a new folder remotely""" | |
path = _to_smb_path_fmt(path) | |
self._runcmd_error_on_data('mkdir', path) | |
def cd(self, path): | |
"""Change remote dir""" | |
path = _to_smb_path_fmt(path) | |
self._runcmd_error_on_data('cd', path) | |
def lcd(self, path): | |
"""Change local dir""" | |
self._runcmd_error_on_data('lcd', path) | |
def rmdir(self, path): | |
"""Removes a remote empty folder""" | |
path = _to_smb_path_fmt(path) | |
self._runcmd_error_on_data('rmdir', path) | |
def unlink(self, path): | |
"""Removes/deletes/unlinks a file or folder""" | |
path = _to_smb_path_fmt(path).rstrip('\/') | |
if self.isdir(path): | |
try: | |
self.set_recurse() | |
self._runcmd_error_on_data('del', path + '\\*') | |
self.rmdir(path) | |
finally: | |
self.set_recurse(False) | |
else: | |
self._runcmd_error_on_data('del', path) | |
remove = unlink | |
def chmod(self, path, *modes): | |
"""Set/reset file modes | |
Tested with: AHS | |
smbc.chmod('/file.txt', '+H') | |
""" | |
path = _to_smb_path_fmt(path) | |
plus_modes = [] | |
minus_modes = [] | |
for mode in modes: | |
if mode.startswith('-'): | |
minus_modes.append(mode.lstrip('-')) | |
else: | |
plus_modes.append(mode.lstrip('+')) | |
modes = [] | |
if plus_modes: | |
modes.append('+%s' % ''.join(plus_modes)) | |
if minus_modes: | |
modes.append('-%s' % ''.join(minus_modes)) | |
self._runcmd_error_on_data('setmode', path, ''.join(modes)) | |
def rename(self, old_name, new_name): | |
old_name = _to_smb_path_fmt(old_name) | |
new_name = _to_smb_path_fmt(new_name) | |
self._runcmd_error_on_data('rename', old_name, new_name) | |
def download_file(self, remote_path, local_path): | |
remote_path = _to_smb_path_fmt(remote_path) | |
self._runcmd('get', remote_path, local_path) | |
def upload_file(self, local_path, remote_path): | |
remote_path = _to_smb_path_fmt(remote_path) | |
self._runcmd('put', local_path, remote_path) | |
def upload_update(self, local_path, remote_path): | |
remote_path = _to_smb_path_fmt(remote_path) | |
self._runcmd('reput', local_path, remote_path) | |
def walk(self, top, topdown=True): | |
"""Walks through item in path recursively.""" | |
names = self.glob(os.path.join(top, '*')) | |
dirs, nondirs = [], [] | |
for item in names: | |
if 'D' in item[1]: | |
dirs.append(item[0]) | |
else: | |
nondirs.append(item[0]) | |
if topdown: | |
yield top, dirs, nondirs | |
for name in dirs: | |
new_path = os.path.join(top, name) | |
for x in self.walk(new_path, topdown): | |
yield x | |
if not topdown: | |
yield top, dirs, nondirs | |
def upload(self, local_path, remote_path): | |
"""Uploads local file or directory.""" | |
if os.path.isdir(local_path): | |
if not self.exists(remote_path): | |
self.makedirs(remote_path) | |
for root, dirs, files in os.walk(local_path): | |
remote_root = root.replace(local_path, _to_local_path_fmt(remote_path)) | |
for d in dirs: | |
rem_dir = _to_smb_path_fmt(os.path.join(remote_root, d)) | |
if not self.exists(rem_dir): | |
self.makedirs(rem_dir) | |
for f in files: | |
rem_f = _to_smb_path_fmt(os.path.join(remote_root, f)) | |
self.upload_file(os.path.join(root, f), rem_f) | |
else: | |
basedir = os.path.dirname(remote_path) | |
if not self.exists(basedir): | |
self.makedirs(basedir) | |
self.upload_file(local_path, remote_path) | |
def download(self, remote_path, local_path): | |
"""Downloads remote file or directory.""" | |
if self.isdir(remote_path): | |
if not os.path.exists(local_path): | |
os.makedirs(local_path) | |
for root, dirs, files in self.walk(remote_path): | |
local_root = root.replace(_to_local_path_fmt(remote_path), local_path) | |
for d in dirs: | |
loc_dir = os.path.join(local_root, d) | |
if not os.path.exists(loc_dir): | |
os.makedirs(loc_dir) | |
for f in files: | |
loc_f = os.path.join(local_root, f) | |
self.download_file(_to_smb_path_fmt(os.path.join(root, f)), loc_f) | |
else: | |
basedir = os.path.dirname(local_path) | |
if not os.path.exists(basedir): | |
self.makedirs(basedir) | |
self.download_file(remote_path, local_path) | |
def get_cwd(self): | |
"""Returns current working directory.""" | |
out = self._runcmd('pwd').strip() | |
match = RE_CWD.search(out) | |
if match: | |
return match.groupdict()['rel_path'] | |
raise SambaClientError('pwd command failed') | |
def makedirs(self, remote_path): | |
"""Creates directories recursively.""" | |
remote_path = _to_smb_path_fmt(remote_path) | |
dir_items = remote_path.split('\\') | |
for i in range(len(dir_items)): | |
part_path = '\\'.join(dir_items[:i + 1]) | |
if not self.exists(part_path): | |
self.mkdir(part_path) | |
def set_recurse(self, recurse=True): | |
cwd = self.get_cwd() | |
try: | |
if cwd != '\\': | |
self.cd('\\') | |
self._runcmd_error_on_data('recurse %s' % ('ON' if recurse else 'OFF')) | |
finally: | |
if cwd != '\\': | |
self.cd(cwd) | |
def __enter__(self): | |
return self | |
def __exit__(self, exc_type, exc_value, exc_traceback): | |
self.close() | |
def __repr__(self): | |
return '<SambaClient({self.domain}\\{self.username}@//{self.host}/{self.share})>'.format(self=self) | |
def close(self): | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment