-
-
Save jborean93/a3cb93fa6237012ebf587e1fbe8fc903 to your computer and use it in GitHub Desktop.
# Copyright: (c) 2019, Jordan Borean (@jborean93) <jborean93@gmail.com> | |
# MIT License (see LICENSE or https://opensource.org/licenses/MIT) | |
import uuid | |
from smbprotocol.connection import Connection | |
from smbprotocol.session import Session | |
from smbprotocol.open import CreateDisposition, CreateOptions, DirectoryAccessMask, FileAttributes, \ | |
FileInformationClass, ImpersonationLevel, Open, ShareAccess | |
from smbprotocol.tree import TreeConnect | |
class FileEntry(object): | |
def __init__(self, path, file_directory_info): | |
self.name = file_directory_info['file_name'].value.decode('utf-16-le') | |
self.path = r"%s\%s" % (path, self.name) | |
self.ctime = file_directory_info['creation_time'].value | |
self.atime = file_directory_info['last_access_time'].value | |
self.wtime = file_directory_info['last_write_time'].value | |
self.size = file_directory_info['allocation_size'].value | |
self.attributes = file_directory_info['file_attributes'].value | |
self.is_archive = self._flag_set(FileAttributes.FILE_ATTRIBUTE_ARCHIVE) | |
self.is_compressed = self._flag_set(FileAttributes.FILE_ATTRIBUTE_COMPRESSED) | |
self.is_directory = self._flag_set(FileAttributes.FILE_ATTRIBUTE_DIRECTORY) | |
self.is_hidden = self._flag_set(FileAttributes.FILE_ATTRIBUTE_HIDDEN) | |
self.is_normal = self._flag_set(FileAttributes.FILE_ATTRIBUTE_NORMAL) | |
self.is_readonly = self._flag_set(FileAttributes.FILE_ATTRIBUTE_READONLY) | |
self.is_reparse_point = self._flag_set(FileAttributes.FILE_ATTRIBUTE_REPARSE_POINT) | |
self.is_system = self._flag_set(FileAttributes.FILE_ATTRIBUTE_SYSTEM) | |
self.is_temporary = self._flag_set(FileAttributes.FILE_ATTRIBUTE_TEMPORARY) | |
def _flag_set(self, attribute): | |
return self.attributes & attribute == attribute | |
def _listdir(tree, path, pattern, recurse): | |
full_path = tree.share_name | |
if path != "": | |
full_path += r"\%s" % path | |
# We create a compound request that does the following; | |
# 1. Opens a handle to the directory | |
# 2. Runs a query on the directory to list all the files | |
# 3. Closes the handle of the directory | |
# This is done in a compound request so we send 1 packet instead of 3 at the expense of more complex code. | |
directory = Open(tree, path) | |
query = [ | |
directory.create( | |
ImpersonationLevel.Impersonation, | |
DirectoryAccessMask.FILE_LIST_DIRECTORY, | |
FileAttributes.FILE_ATTRIBUTE_DIRECTORY, | |
ShareAccess.FILE_SHARE_READ | ShareAccess.FILE_SHARE_WRITE, | |
CreateDisposition.FILE_OPEN, | |
CreateOptions.FILE_DIRECTORY_FILE, | |
send=False | |
), | |
directory.query_directory( | |
pattern, | |
FileInformationClass.FILE_DIRECTORY_INFORMATION, | |
send=False | |
), | |
directory.close(False, send=False) | |
] | |
query_reqs = tree.session.connection.send_compound( | |
[x[0] for x in query], | |
tree.session.session_id, | |
tree.tree_connect_id, | |
related=True | |
) | |
# Process the result of the create and close request before parsing the files. | |
query[0][1](query_reqs[0]) | |
query[2][1](query_reqs[2]) | |
# Parse the queried files and repeat if the entry is a directory and recurse=True. We ignore . and .. as they are | |
# not directories inside the queried dir. | |
entries = [] | |
ignore_entries = [".".encode('utf-16-le'), "..".encode('utf-16-le')] | |
for file_entry in query[1][1](query_reqs[1]): | |
if file_entry['file_name'].value in ignore_entries: | |
continue | |
fe = FileEntry(full_path, file_entry) | |
entries.append(fe) | |
if fe.is_directory and recurse: | |
dir_path = r"%s\%s" % (path, fe.name) if path != "" else fe.name | |
entries += _listdir(tree, dir_path, recurse) | |
return entries | |
def smb_listdir(path, pattern='*', username=None, password=None, encrypt=True, recurse=False): | |
""" | |
List the files and folders in an SMB path and it's attributes. | |
:param path: The full SMB share to list, this should be \\server\share with an optional path added to the end. | |
:param pattern: The glob-like pattern to filter out files, defaults to '*' which matches all files and folders. | |
:param username: Optional username to use for authentication, required if Kerberos is not used. | |
:param password: Optional password to use for authentication, required if Kerberos is not used. | |
:param enrypt: Whether to use encryption or not, Must be set to False if using an older SMB Dialect. | |
:param recurse: Whether to search recursively or just the top level. | |
:return: A list of FileEntry objects | |
""" | |
path_split = [e for e in path.split('\\') if e] | |
if len(path_split) < 2: | |
raise Exception("Path should specify at least the server and share to connect to.") | |
server = path_split[0] | |
share = path_split[1] | |
share_path = "\\".join(path_split[2:]) if len(path_split) > 2 else '' | |
conn = Connection(uuid.uuid4(), server) | |
conn.connect() | |
try: | |
session = Session(conn, username=username, password=password, require_encryption=encrypt) | |
session.connect() | |
try: | |
tree = TreeConnect(session, r"\\%s\%s" % (server, share)) | |
tree.connect() | |
try: | |
return _listdir(tree, share_path, pattern, recurse) | |
finally: | |
tree.disconnect() | |
finally: | |
session.disconnect() | |
finally: | |
conn.disconnect() | |
files = smb_listdir(r'\\server.domain.local\c$\folder') |
Here is one way https://gist.github.com/jborean93/255a104f69a791868e92ac5d37155963. You can adjust the buffer size but it will read as much data based on the max length negotiated. You can handle the bytes directly if you wish but that example will show you how to store it in a BytesIO object.
@jborean93 Thanks for providing these gists. I've tried implementing this and it works great if I try to list the directories at the root of the share. However, whenever I try to go into a subdirectory I'm getting this error: SMBResponseException: Received unexpected status from the server: (3221226071) UNKNOWN_ENUM: 0xc0000257
. Any idea what might be causing this?
Thanks for your work on this package!
According to https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-erref/596a1078-e883-4972-9bbc-49e60bebca55, the NtStatus 0xc0000257
means STATUS_PATH_NOT_COVERED
which tells me the path is potentially a DFS share or maybe mapped to one. Unfortunately smbprotocol does not support DFS right now. Also I recommend you keep a lookout for the easy-api branch of smbprotocol which makes this a whole lot easier for you. When merged and included in a release this whole snippet would just become
import smbclient
for dir_entry in smbclient.scandir(r"\\server\share\dir", username="user", password="pass"):
...
Thank you!! I tried searching for that error code but didn't come up with that documentation page. Thanks for being an unofficial help desk for me; you didn't have to do that. I'll start researching to see what I can try for mounting a DFS share.
Thanks again for this.
What's the most efficient way to (given we know the full file path of a particular file) read a file and 'write' it to an in-memory file (e.g.
io.BytesIO(b'')
? I'm scraping some data from Excel files in a share by converting them to DataFrames, etc.