Skip to content

Instantly share code, notes, and snippets.

@jborean93
Last active September 13, 2023 13:23
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jborean93/a3cb93fa6237012ebf587e1fbe8fc903 to your computer and use it in GitHub Desktop.
Save jborean93/a3cb93fa6237012ebf587e1fbe8fc903 to your computer and use it in GitHub Desktop.
Gets the contents of an directory exposes by SMB in Python
# Copyright: (c) 2019, Jordan Borean (@jborean93) <jborean93@gmail.com>
# MIT License (see LICENSE or https://opensource.org/licenses/MIT)
import uuid
from smbprotocol.connection import Connection
from smbprotocol.session import Session
from smbprotocol.open import CreateDisposition, CreateOptions, DirectoryAccessMask, FileAttributes, \
FileInformationClass, ImpersonationLevel, Open, ShareAccess
from smbprotocol.tree import TreeConnect
class FileEntry(object):
def __init__(self, path, file_directory_info):
self.name = file_directory_info['file_name'].value.decode('utf-16-le')
self.path = r"%s\%s" % (path, self.name)
self.ctime = file_directory_info['creation_time'].value
self.atime = file_directory_info['last_access_time'].value
self.wtime = file_directory_info['last_write_time'].value
self.size = file_directory_info['allocation_size'].value
self.attributes = file_directory_info['file_attributes'].value
self.is_archive = self._flag_set(FileAttributes.FILE_ATTRIBUTE_ARCHIVE)
self.is_compressed = self._flag_set(FileAttributes.FILE_ATTRIBUTE_COMPRESSED)
self.is_directory = self._flag_set(FileAttributes.FILE_ATTRIBUTE_DIRECTORY)
self.is_hidden = self._flag_set(FileAttributes.FILE_ATTRIBUTE_HIDDEN)
self.is_normal = self._flag_set(FileAttributes.FILE_ATTRIBUTE_NORMAL)
self.is_readonly = self._flag_set(FileAttributes.FILE_ATTRIBUTE_READONLY)
self.is_reparse_point = self._flag_set(FileAttributes.FILE_ATTRIBUTE_REPARSE_POINT)
self.is_system = self._flag_set(FileAttributes.FILE_ATTRIBUTE_SYSTEM)
self.is_temporary = self._flag_set(FileAttributes.FILE_ATTRIBUTE_TEMPORARY)
def _flag_set(self, attribute):
return self.attributes & attribute == attribute
def _listdir(tree, path, pattern, recurse):
full_path = tree.share_name
if path != "":
full_path += r"\%s" % path
# We create a compound request that does the following;
# 1. Opens a handle to the directory
# 2. Runs a query on the directory to list all the files
# 3. Closes the handle of the directory
# This is done in a compound request so we send 1 packet instead of 3 at the expense of more complex code.
directory = Open(tree, path)
query = [
directory.create(
ImpersonationLevel.Impersonation,
DirectoryAccessMask.FILE_LIST_DIRECTORY,
FileAttributes.FILE_ATTRIBUTE_DIRECTORY,
ShareAccess.FILE_SHARE_READ | ShareAccess.FILE_SHARE_WRITE,
CreateDisposition.FILE_OPEN,
CreateOptions.FILE_DIRECTORY_FILE,
send=False
),
directory.query_directory(
pattern,
FileInformationClass.FILE_DIRECTORY_INFORMATION,
send=False
),
directory.close(False, send=False)
]
query_reqs = tree.session.connection.send_compound(
[x[0] for x in query],
tree.session.session_id,
tree.tree_connect_id,
related=True
)
# Process the result of the create and close request before parsing the files.
query[0][1](query_reqs[0])
query[2][1](query_reqs[2])
# Parse the queried files and repeat if the entry is a directory and recurse=True. We ignore . and .. as they are
# not directories inside the queried dir.
entries = []
ignore_entries = [".".encode('utf-16-le'), "..".encode('utf-16-le')]
for file_entry in query[1][1](query_reqs[1]):
if file_entry['file_name'].value in ignore_entries:
continue
fe = FileEntry(full_path, file_entry)
entries.append(fe)
if fe.is_directory and recurse:
dir_path = r"%s\%s" % (path, fe.name) if path != "" else fe.name
entries += _listdir(tree, dir_path, recurse)
return entries
def smb_listdir(path, pattern='*', username=None, password=None, encrypt=True, recurse=False):
"""
List the files and folders in an SMB path and it's attributes.
:param path: The full SMB share to list, this should be \\server\share with an optional path added to the end.
:param pattern: The glob-like pattern to filter out files, defaults to '*' which matches all files and folders.
:param username: Optional username to use for authentication, required if Kerberos is not used.
:param password: Optional password to use for authentication, required if Kerberos is not used.
:param enrypt: Whether to use encryption or not, Must be set to False if using an older SMB Dialect.
:param recurse: Whether to search recursively or just the top level.
:return: A list of FileEntry objects
"""
path_split = [e for e in path.split('\\') if e]
if len(path_split) < 2:
raise Exception("Path should specify at least the server and share to connect to.")
server = path_split[0]
share = path_split[1]
share_path = "\\".join(path_split[2:]) if len(path_split) > 2 else ''
conn = Connection(uuid.uuid4(), server)
conn.connect()
try:
session = Session(conn, username=username, password=password, require_encryption=encrypt)
session.connect()
try:
tree = TreeConnect(session, r"\\%s\%s" % (server, share))
tree.connect()
try:
return _listdir(tree, share_path, pattern, recurse)
finally:
tree.disconnect()
finally:
session.disconnect()
finally:
conn.disconnect()
files = smb_listdir(r'\\server.domain.local\c$\folder')
@jp495
Copy link

jp495 commented Jul 8, 2019

Thanks again for this.

What's the most efficient way to (given we know the full file path of a particular file) read a file and 'write' it to an in-memory file (e.g. io.BytesIO(b'')? I'm scraping some data from Excel files in a share by converting them to DataFrames, etc.

@jborean93
Copy link
Author

Here is one way https://gist.github.com/jborean93/255a104f69a791868e92ac5d37155963. You can adjust the buffer size but it will read as much data based on the max length negotiated. You can handle the bytes directly if you wish but that example will show you how to store it in a BytesIO object.

@jecolvin
Copy link

@jborean93 Thanks for providing these gists. I've tried implementing this and it works great if I try to list the directories at the root of the share. However, whenever I try to go into a subdirectory I'm getting this error: SMBResponseException: Received unexpected status from the server: (3221226071) UNKNOWN_ENUM: 0xc0000257. Any idea what might be causing this?

Thanks for your work on this package!

@jborean93
Copy link
Author

According to https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-erref/596a1078-e883-4972-9bbc-49e60bebca55, the NtStatus 0xc0000257 means STATUS_PATH_NOT_COVERED which tells me the path is potentially a DFS share or maybe mapped to one. Unfortunately smbprotocol does not support DFS right now. Also I recommend you keep a lookout for the easy-api branch of smbprotocol which makes this a whole lot easier for you. When merged and included in a release this whole snippet would just become

import smbclient

for dir_entry in smbclient.scandir(r"\\server\share\dir", username="user", password="pass"):
    ...

@jecolvin
Copy link

Thank you!! I tried searching for that error code but didn't come up with that documentation page. Thanks for being an unofficial help desk for me; you didn't have to do that. I'll start researching to see what I can try for mounting a DFS share.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment