Skip to content

Instantly share code, notes, and snippets.

@idlesign
Forked from jean-helsinki/gist:85b3ea97b7445abdd9a7
Last active March 7, 2018 10:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save idlesign/6000f730a7fa2421b34a5287d6011a8b to your computer and use it in GitHub Desktop.
Save idlesign/6000f730a7fa2421b34a5287d6011a8b to your computer and use it in GitHub Desktop.
smbclient Python Wrapper using pexpect
# -*- coding: utf-8 -*-
"""
https://gist.github.com/idlesign/6000f730a7fa2421b34a5287d6011a8b
Based on
https://gist.github.com/jean-helsinki/85b3ea97b7445abdd9a7
"""
from __future__ import unicode_literals
import locale
import logging
import os
import re
import sys
from datetime import datetime
import pexpect
from six.moves import range
LOGGER = logging.getLogger(__name__)
RE_VOLUME = re.compile(r"""
Volume:\s # label
\|([^|]*)\|\s # the volume name
serial\snumber\s # another label
0x([a-f0-9]+) # hex serial number
$ # end of line
""", re.VERBOSE)
RE_HEADER = re.compile(r"""
Domain=\[([^]]+)\]\s
OS=\[([^]]+)\]\s
Server=\[([^]]+)\]
$
""", re.VERBOSE)
RE_LS_LINE = re.compile(r"""
\s* # file lines start with some spaces
(.*?)\s+ # capture filename non-greedy, eating remaining spaces
([ADHSR]*) # capture file mode
\s+ # after the mode you can have any number of spaces
(\d+) # file size
\s+ # spaces after file size
( # begin date capturing
\w{3} # abbrev weekday
\s # space
\w{3} # abbrev month
\s{1,2} # one or two spaces before the day
\d{1,2} # day
\s # a space before the time
\d{2}:\d{2}:\d{2} # time
\s # space
\d{4} # year
) # end date capturing
$ # end of string""", re.VERBOSE)
RE_CWD = re.compile(r'Current directory is \\\\(?P<host>[^\\]+)\\((?P<share>[^\\]+))(?P<rel_path>.+)')
RE_PROMPT = r'smb:\s.*\>'
def _to_smb_path_fmt(path):
return path.replace(os.sep, '\\')
def _to_local_path_fmt(path):
return path.replace('\\', os.sep)
class SambaClientError(Exception):
pass
class SambaClient(object):
def __init__(self, host, share, username, domain='.', password='', mode=None, debug=False):
smb_cmd = 'smbclient {mode} -W {domain} -U "{username}" //{host}/{share}/'.format(
host=host,
share=share,
domain=domain,
username=username,
mode='-m %s' % mode if mode else '',
debug='' if isinstance(debug, bool) else '-d %s' % debug # Support integer to set debug level.
)
self.username = username
self.domain = domain
self.host = host
self.share = share
prc = pexpect.spawn(smb_cmd, echo=False, encoding='utf-8')
if debug:
# Отправка журнала pexpect прямо на вывод для отладки.
prc.logfile = sys.stdout
self.prc = prc
self._supply_password(password)
self._runcmd_error_on_data('prompt ON')
def _supply_password(self, pwd):
prc = self.prc
prc.expect(r'(?i).*password:')
prc.sendline(pwd)
prc.expect(RE_PROMPT)
def _runcmd(self, command=None, *args):
prc = self.prc
cmd = ''
if command:
cmd = '%s %s' % (command, ' '.join('"%s"' % arg for arg in args))
LOGGER.debug('SMB command: %s', cmd)
prc.send('%s%s' % (cmd, prc.linesep))
prc.expect(RE_PROMPT)
return prc.before.replace(cmd, '', 1).strip()
def _runcmd_error_on_data(self, cmd, *args):
"""Runs a command and raises SambaClientError if any data is returned.
:param cmd:
:param args:
:raise: SambaClientError
"""
data = self._runcmd(cmd, *args)
if data and ' \b' not in data:
raise SambaClientError('Unexpected result for command %s: %s' % (cmd, data))
return data
def lsdir(self, path):
"""
Lists a directory
returns a list of tuples in the format:
[(filename, modes, size, date), ...]
"""
path = os.path.join(path, '*')
return self.glob(path)
def glob(self, path):
"""
Lists a glob (example: "/files/somefile.*")
returns a list of tuples in the format:
[(filename, modes, size, date), ...]
"""
files = self._runcmd('ls', path).splitlines()
for filedata in files:
match = RE_LS_LINE.match(filedata)
if not match:
continue
name, modes, size, date = match.groups()
if name == '.' or name == '..':
continue
size = int(size)
# Resets locale to "C" to parse english date properly
# (non thread-safe code)
loc = locale.getlocale(locale.LC_TIME)
locale.setlocale(locale.LC_TIME, str('C'))
date = datetime.strptime(date, '%a %b %d %H:%M:%S %Y')
locale.setlocale(locale.LC_TIME, loc)
yield (name, modes, size, date)
def listdir(self, path):
"""Emulates os.listdir()"""
result = [f[0] for f in self.lsdir(path)]
if not result: # can mean both that the dir is empty or not found
# disambiguation: verifies if the path doesn't exist. Let the error
# raised by _get_file_info propagate in that case.
self._get_file_info(path)
return result
def _get_file_info(self, path):
try:
fileinfo = self.glob(path).next()
except StopIteration:
raise SambaClientError('Path not found: %s' % path)
return fileinfo
def info(self, path):
"""Fetches information about a file.
:param path:
:rtype: dict
"""
path = _to_smb_path_fmt(path)
data = self._runcmd('allinfo', path)
if data.startswith('ERRSRV'):
raise SambaClientError('Error retrieving info for %s: %s' % (path, data.strip()))
result = {}
for info in data.splitlines():
k, sep, v = info.partition(':')
if sep:
result[k.strip()] = v.strip()
return result
def diskinfo(self):
"""Fetches information about a volume"""
data = self._runcmd('volume')
for line in data.splitlines():
match = RE_VOLUME.match(line)
if match:
name, serial = match.groups()
return name, int(serial, 16)
else:
raise SambaClientError('Error while retrieving disk info: %s' % data)
def volume(self):
"""Fetches the volume name"""
return self.diskinfo()[0]
def serial(self):
"""Fetches the volume serial"""
return self.diskinfo()[1]
def isdir(self, path):
"""Returns True if path is a directory/folder"""
return 'D' in self._get_file_info(path)[1]
def isfile(self, path):
"""Returns True if path is a regular file"""
return not self.isdir(path)
def exists(self, path):
"""Returns True if path exists in the remote host"""
try:
self._get_file_info(path)
except SambaClientError:
return False
else:
return True
def mkdir(self, path):
"""Creates a new folder remotely"""
path = _to_smb_path_fmt(path)
self._runcmd_error_on_data('mkdir', path)
def cd(self, path):
"""Change remote dir"""
path = _to_smb_path_fmt(path)
self._runcmd_error_on_data('cd', path)
def lcd(self, path):
"""Change local dir"""
self._runcmd_error_on_data('lcd', path)
def rmdir(self, path):
"""Removes a remote empty folder"""
path = _to_smb_path_fmt(path)
self._runcmd_error_on_data('rmdir', path)
def unlink(self, path):
"""Removes/deletes/unlinks a file or folder"""
path = _to_smb_path_fmt(path).rstrip('\/')
if self.isdir(path):
try:
self.set_recurse()
self._runcmd_error_on_data('del', path + '\\*')
self.rmdir(path)
finally:
self.set_recurse(False)
else:
self._runcmd_error_on_data('del', path)
remove = unlink
def chmod(self, path, *modes):
"""Set/reset file modes
Tested with: AHS
smbc.chmod('/file.txt', '+H')
"""
path = _to_smb_path_fmt(path)
plus_modes = []
minus_modes = []
for mode in modes:
if mode.startswith('-'):
minus_modes.append(mode.lstrip('-'))
else:
plus_modes.append(mode.lstrip('+'))
modes = []
if plus_modes:
modes.append('+%s' % ''.join(plus_modes))
if minus_modes:
modes.append('-%s' % ''.join(minus_modes))
self._runcmd_error_on_data('setmode', path, ''.join(modes))
def rename(self, old_name, new_name):
old_name = _to_smb_path_fmt(old_name)
new_name = _to_smb_path_fmt(new_name)
self._runcmd_error_on_data('rename', old_name, new_name)
def download_file(self, remote_path, local_path):
remote_path = _to_smb_path_fmt(remote_path)
self._runcmd('get', remote_path, local_path)
def upload_file(self, local_path, remote_path):
remote_path = _to_smb_path_fmt(remote_path)
self._runcmd('put', local_path, remote_path)
def upload_update(self, local_path, remote_path):
remote_path = _to_smb_path_fmt(remote_path)
self._runcmd('reput', local_path, remote_path)
def walk(self, top, topdown=True):
"""Walks through item in path recursively."""
names = self.glob(os.path.join(top, '*'))
dirs, nondirs = [], []
for item in names:
if 'D' in item[1]:
dirs.append(item[0])
else:
nondirs.append(item[0])
if topdown:
yield top, dirs, nondirs
for name in dirs:
new_path = os.path.join(top, name)
for x in self.walk(new_path, topdown):
yield x
if not topdown:
yield top, dirs, nondirs
def upload(self, local_path, remote_path):
"""Uploads local file or directory."""
if os.path.isdir(local_path):
if not self.exists(remote_path):
self.makedirs(remote_path)
for root, dirs, files in os.walk(local_path):
remote_root = root.replace(local_path, _to_local_path_fmt(remote_path))
for d in dirs:
rem_dir = _to_smb_path_fmt(os.path.join(remote_root, d))
if not self.exists(rem_dir):
self.makedirs(rem_dir)
for f in files:
rem_f = _to_smb_path_fmt(os.path.join(remote_root, f))
self.upload_file(os.path.join(root, f), rem_f)
else:
basedir = os.path.dirname(remote_path)
if not self.exists(basedir):
self.makedirs(basedir)
self.upload_file(local_path, remote_path)
def download(self, remote_path, local_path):
"""Downloads remote file or directory."""
if self.isdir(remote_path):
if not os.path.exists(local_path):
os.makedirs(local_path)
for root, dirs, files in self.walk(remote_path):
local_root = root.replace(_to_local_path_fmt(remote_path), local_path)
for d in dirs:
loc_dir = os.path.join(local_root, d)
if not os.path.exists(loc_dir):
os.makedirs(loc_dir)
for f in files:
loc_f = os.path.join(local_root, f)
self.download_file(_to_smb_path_fmt(os.path.join(root, f)), loc_f)
else:
basedir = os.path.dirname(local_path)
if not os.path.exists(basedir):
self.makedirs(basedir)
self.download_file(remote_path, local_path)
def get_cwd(self):
"""Returns current working directory."""
out = self._runcmd('pwd').strip()
match = RE_CWD.search(out)
if match:
return match.groupdict()['rel_path']
raise SambaClientError('pwd command failed')
def makedirs(self, remote_path):
"""Creates directories recursively."""
remote_path = _to_smb_path_fmt(remote_path)
dir_items = remote_path.split('\\')
for i in range(len(dir_items)):
part_path = '\\'.join(dir_items[:i + 1])
if not self.exists(part_path):
self.mkdir(part_path)
def set_recurse(self, recurse=True):
cwd = self.get_cwd()
try:
if cwd != '\\':
self.cd('\\')
self._runcmd_error_on_data('recurse %s' % ('ON' if recurse else 'OFF'))
finally:
if cwd != '\\':
self.cd(cwd)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
self.close()
def __repr__(self):
return '<SambaClient({self.domain}\\{self.username}@//{self.host}/{self.share})>'.format(self=self)
def close(self):
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment