Skip to content

Instantly share code, notes, and snippets.

@UserUnknownFactor
Last active March 23, 2023 08:45
Show Gist options
  • Save UserUnknownFactor/96d3d3246ce89e0c54feef3f2ee05243 to your computer and use it in GitHub Desktop.
Save UserUnknownFactor/96d3d3246ce89e0c54feef3f2ee05243 to your computer and use it in GitHub Desktop.
Python Windows utility to check what NTFS files correspond to what bad sectors on disk that showed up as failed in System Log (also corresponds to bad physical sector reads in low-level disk checking tools that use native Windows API).
import win32evtlog # requires pywin32 pre-installed (`pip install pywin32`)
import os, sys, platform, tempfile, subprocess, base64, hashlib
from struct import unpack
import ctypes
from ctypes.wintypes import HANDLE
from ctypes.wintypes import BOOL
from ctypes.wintypes import HWND
from ctypes.wintypes import DWORD
from ctypes.wintypes import ULONG
from ctypes.wintypes import USHORT
from ctypes.wintypes import LPVOID
from ctypes.wintypes import LPWSTR
if ctypes.windll.shell32.IsUserAnAdmin() == 0:
print('This script requires Administrator privileges...')
sys.exit()
NULL = 0
HDEVINFO = ctypes.c_void_p
LPCTSTR = ctypes.c_wchar_p
PCTSTR = ctypes.c_wchar_p
PTSTR = ctypes.c_wchar_p
LPDWORD = PDWORD = ctypes.POINTER(DWORD)
LPBYTE = PBYTE = ctypes.c_void_p
ULONG_PTR = ctypes.POINTER(ULONG)
CHAR = ctypes.c_ubyte
LPOVERLAPPED = LPVOID
LPSECURITY_ATTRIBUTES = LPVOID
GENERIC_READ = 0x80000000
FILE_SHARE_READ = 1
GENERIC_ALL = 0x10000000
OPEN_EXISTING = 3
FILE_ATTRIBUTE_NORMAL = 0x00000080
DIGCF_PRESENT = 2
DIGCF_DEVICEINTERFACE = 0x10
IOCTL_STORAGE_GET_DEVICE_NUMBER = 0x002D1080
SPDRP_PHYSICAL_DEVICE_OBJECT_NAME = 0x0000000E
INVALID_HANDLE_VALUE = -1
ERROR_PATH_NOT_FOUND = 3
ERROR_INSUFFICIENT_BUFFER = 122
SCSI_OP_IOCTL = 0x0F
class STORAGE_DEVICE_NUMBER(ctypes.Structure):
_fields_ = [
('DeviceType', DWORD),
('DeviceNumber', DWORD),
('PartitionNumber', DWORD)
]
class GUID(ctypes.Structure):
_fields_ = [
("data1", ULONG),
("data2", USHORT),
("data3", USHORT),
("data4", CHAR * 8)
]
def __init__(self, dw=0, w1=0, w2=0, b1=0, b2=0, b3=0, b4=0, b5=0, b6=0,
b7=0, b8=0):
self.data1 = dw
self.data2 = w1
self.data3 = w2
self.data4[0] = b1
self.data4[1] = b2
self.data4[2] = b3
self.data4[3] = b4
self.data4[4] = b5
self.data4[5] = b6
self.data4[6] = b7
self.data4[7] = b8
class SP_DEVICE_INTERFACE_DATA(ctypes.Structure):
_fields_ = [
('cbSize', DWORD),
('InterfaceClassGuid', GUID),
('Flags', DWORD),
('Reserved', ULONG_PTR),
]
def __str__(self):
return "InterfaceClassGuid:%s Flags:%s" % (self.InterfaceClassGuid, self.Flags)
PSP_DEVICE_INTERFACE_DETAIL_DATA = ctypes.c_void_p
PSP_DEVICE_INTERFACE_DATA = ctypes.POINTER(SP_DEVICE_INTERFACE_DATA)
class SP_DEVICE_INTERFACE_DETAIL_DATA_W(ctypes.Structure):
_fields_ = [
('cbSize', DWORD),
('DevicePath', CHAR * 2)
]
class SP_DEVINFO_DATA(ctypes.Structure):
_fields_ = [
('cbSize', DWORD),
('ClassGuid', GUID),
('DevInst', DWORD),
('Reserved', ULONG_PTR),
]
def __str__(self):
return "ClassGuid:%s DevInst:%s" % (self.ClassGuid, self.DevInst)
PSP_DEVINFO_DATA = ctypes.POINTER(SP_DEVINFO_DATA)
def ValidHandle(value, func, arguments):
if value == 0:
raise ctypes.WinError()
return value
GUID_DEVINTERFACE_DISK = GUID(0x53f56307, 0xb6bf, 0x11d0, 0x94, 0xf2,
0x00, 0xa0, 0xc9, 0x1e, 0xfb, 0x8b)
GUID_DEVINTERFACE_VOLUME = GUID(0x53F5630D, 0xB6BF, 0x11D0, 0x94, 0xF2,
0x00, 0xA0, 0xC9, 0x1E, 0xFB, 0x8B)
setupapi = ctypes.windll.LoadLibrary("setupapi")
kernel32 = ctypes.windll.kernel32
SetupDiGetClassDevs = setupapi.SetupDiGetClassDevsW
SetupDiGetClassDevs.argtypes = [ctypes.POINTER(GUID), PCTSTR, HWND, DWORD]
SetupDiGetClassDevs.restype = HDEVINFO
SetupDiGetClassDevs.errcheck = ValidHandle
SetupDiEnumDeviceInfo = setupapi.SetupDiEnumDeviceInfo
SetupDiEnumDeviceInfo.argtypes = [HDEVINFO, DWORD, PSP_DEVINFO_DATA]
SetupDiEnumDeviceInfo.restype = BOOL
SetupDiGetDeviceRegistryProperty = setupapi.SetupDiGetDeviceRegistryPropertyW
SetupDiGetDeviceRegistryProperty.argtypes = [HDEVINFO, PSP_DEVINFO_DATA, DWORD, PDWORD, PBYTE, DWORD, PDWORD]
SetupDiGetDeviceRegistryProperty.restype = BOOL
SetupDiDestroyDeviceInfoList = setupapi.SetupDiDestroyDeviceInfoList
SetupDiDestroyDeviceInfoList.argtypes = [HDEVINFO]
SetupDiDestroyDeviceInfoList.restype = BOOL
def _DeviceIoControl(devhandle, ioctl, inbuf, inbufsiz, outbuf, outbufsiz):
DeviceIoControl_Fn = kernel32.DeviceIoControl
DeviceIoControl_Fn.argtypes = [
HANDLE, # _In_ HANDLE hDevice
DWORD, # _In_ DWORD dwIoControlCode
LPVOID, # _In_opt_ LPVOID lpInBuffer
DWORD, # _In_ DWORD nInBufferSize
LPVOID, # _Out_opt_ LPVOID lpOutBuffer
DWORD, # _In_ DWORD nOutBufferSize
LPDWORD, # _Out_opt_ LPDWORD lpBytesReturned
LPOVERLAPPED] # _Inout_opt_ LPOVERLAPPED lpOverlapped
DeviceIoControl_Fn.restype = BOOL
# allocate a DWORD, and take its reference
dwBytesReturned = DWORD(0)
lpBytesReturned = ctypes.byref(dwBytesReturned)
status = DeviceIoControl_Fn(devhandle,
ioctl,
inbuf,
inbufsiz,
outbuf,
outbufsiz,
lpBytesReturned,
None)
return status, dwBytesReturned
def _CreateFile(filename, access, mode, creation, flags):
CreateFile_Fn = kernel32.CreateFileW
CreateFile_Fn.argtypes = [
LPWSTR, # _In_ LPCTSTR lpFileName
DWORD, # _In_ DWORD dwDesiredAccess
DWORD, # _In_ DWORD dwShareMode
LPSECURITY_ATTRIBUTES, # _In_opt_ LPSECURITY_ATTRIBUTES lpSecurityAttributes
DWORD, # _In_ DWORD dwCreationDisposition
DWORD, # _In_ DWORD dwFlagsAndAttributes
HANDLE] # _In_opt_ HANDLE hTemplateFile
CreateFile_Fn.restype = HANDLE
return HANDLE(CreateFile_Fn(filename, access, mode, NULL, creation, flags, NULL))
class DeviceIoControl(object):
def __init__(self, path):
self.path = path
self._fhandle = None
def _validate_handle(self):
if self._fhandle is None:
raise Exception('No file handle')
if self._fhandle.value == HANDLE(INVALID_HANDLE_VALUE).value:
ecode = kernel32.GetLastError()
if ecode == ERROR_PATH_NOT_FOUND:
ecode = 'ERROR_PATH_NOT_FOUND'
raise Exception(f'Failed to open {self.path}. Error: {ecode}')
def ioctl(self, ctl, inbuf, inbufsiz, outbuf, outbufsiz):
res = (None, None)
self._validate_handle()
res = _DeviceIoControl(self._fhandle, ctl, inbuf, inbufsiz, outbuf, outbufsiz)
return res
def __enter__(self):
self._fhandle = _CreateFile(self.path, FILE_SHARE_READ, 0, OPEN_EXISTING, 0)
self._validate_handle()
return self
def __exit__(self, typ, val, tb):
try:
self._validate_handle()
except Exception:
pass
else:
kernel32.CloseHandle(self._fhandle)
map_disks = dict()
def get_device_for_nfi(winpath):
global map_disks
found = None
if winpath in map_disks:
return map_disks[winpath]
disk_guid = GUID_DEVINTERFACE_DISK
g_hdi = SetupDiGetClassDevs(ctypes.byref(disk_guid), None, None,
DIGCF_PRESENT | DIGCF_DEVICEINTERFACE)
if g_hdi == INVALID_HANDLE_VALUE:
raise Exception
try:
szHardwareID = ctypes.create_unicode_buffer(1024)
lenHardwareID = ctypes.sizeof(szHardwareID) - 1
devinfo = SP_DEVINFO_DATA()
devinfo.cbSize = ctypes.sizeof(SP_DEVINFO_DATA)
index = 0
while SetupDiEnumDeviceInfo(g_hdi, index, ctypes.byref(devinfo)):
ret = SetupDiGetDeviceRegistryProperty(
g_hdi,
ctypes.byref(devinfo),
SPDRP_PHYSICAL_DEVICE_OBJECT_NAME,
None,
ctypes.byref(szHardwareID),
lenHardwareID, None)
if not ret: continue
disk = f'\\Device\\Harddisk{index}\\DR{index}'
sdn = STORAGE_DEVICE_NUMBER()
try:
with DeviceIoControl('\\\\.\\GLOBALROOT' + szHardwareID.value) as dev:
status, _ = dev.ioctl(IOCTL_STORAGE_GET_DEVICE_NUMBER, None, 0,
ctypes.pointer(sdn), ctypes.sizeof(STORAGE_DEVICE_NUMBER))
# not always working?
if status:
disk = f'\\Device\\Harddisk{sdn.DeviceNumber}\\DR{sdn.DeviceNumber}'
except:
print(f'Error detecting harddisk device {szHardwareID.value} number...')
index += 1
if winpath == disk:
found = szHardwareID.value
map_disks[winpath] = found
break
except:
print(f'Error detecting device ID for {winpath}...')
finally:
SetupDiDestroyDeviceInfoList(g_hdi)
return found
def call_with_output(command):
success = False
output = None
try:
output = subprocess.check_output(command).decode()
success = True
except subprocess.CalledProcessError as e:
output = e.output.decode()
except Exception as e:
#output = str(e)
pass
return(success, output)
def get_disk_errors(tolerance=200):
server = platform.node() # name of the current computer to get event logs
logtype = 'System' # 'Application' # 'Security'
hlog = win32evtlog.OpenEventLog(server, logtype)
flags = win32evtlog.EVENTLOG_FORWARDS_READ|win32evtlog.EVENTLOG_SEQUENTIAL_READ
total = win32evtlog.GetNumberOfEventLogRecords(hlog)
error_sectors = []
sector = 0
for i in range(total): #while True:
events = win32evtlog.ReadEventLog(hlog, flags,0)
if events:
for event in events:
if event.SourceName == "disk":
data = event.StringInserts
disk = ''
sector_tmp = ''
nsectors = ''
if (event.Data and len(event.Data) > 139 and event.Data[0] != SCSI_OP_IOCTL):
sector_tmp = unpack('>L', event.Data[130:134])[0] # LBA
nsectors = unpack('>L', event.Data[134:138])[0] # length
if event.EventType == 2: #windows 8
if (len(data > 1)):
sector_tmp = int(data[1], 0)
disk = data[3]
else: # windows 10?
disk = get_device_for_nfi(data[0])
if not disk or not sector_tmp:
#print(f"{disk}/{sector_tmp}/+{nsectors}")
continue
if sector_tmp - sector > tolerance or sector_tmp < sector:
sector = sector_tmp
error_sectors.append((disk, str(sector), str(nsectors)))
win32evtlog.CloseEventLog(hlog)
return error_sectors
# nfi.exe: unofficial Microsoft's NTFS File Sector Information Utility.
nfi_exe = ""
error_sectors = get_disk_errors()
path = 'nfi.exe'
temp = False
if not os.path.exists(path):
fd, path = tempfile.mkstemp(suffix='_nfi.exe')
code = base64.b64decode(nfi_exe)
os.write(fd, code)
#print(f"nfi.exe temporary written as {path}")
os.close(fd)
temp = True
result = ''
check_duplicates = {}
try:
for disk, sect, _ in error_sectors:
if not sect or not disk:
continue
_, text = call_with_output([path, disk, sect])
if not text or "Usage: nfi drive-letter" in text:
print(f'Something went wrong! Disk: {disk}, sector: {sect}')
break
text = text.replace('\r', '').replace('\n\n', '\n')
lines = text.split("\n")[3:]
lines = [
line[4:] if line[:4] == " " else line for line in lines if (
"$STANDARD_INFORMATION (resident)" not in line and "$FILE_NAME (resident)" not in line)
]
md5 = hashlib.md5(lines[2].encode("utf-8")).hexdigest() #file path should be in lines[2]
if md5 not in check_duplicates:
check_duplicates[md5] = True
text = '\n'.join(lines) + '\n'
result += text
finally:
if temp:
os.remove(path)
if result.strip():
with open("errors.txt", "w") as f: f.write(result)
# or maybe instead
print(result) # to use >errors.txt operator to output into a file
else:
print("No bad sectors reported")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment