Skip to content

Instantly share code, notes, and snippets.

@calladoum-elastic
Last active March 18, 2024 07:07
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save calladoum-elastic/328068f19e60a76b00f20cdb936cd078 to your computer and use it in GitHub Desktop.
Save calladoum-elastic/328068f19e60a76b00f20cdb936cd078 to your computer and use it in GitHub Desktop.
Basic client for the ProcLaunchMon.sys device driver
#!/usr/bin/env python -i
"""
To use:
```
$ python -i ProcLaunchMon.py
>>> sess
```
"""
import os
import struct
import time
import threading
from ctypes import *
IOCTL_CODES = {
"Version": 0x22404C,
"GetNextEvent": 0x22403C,
"RemovePid": 0x228038,
"AddPid": 0x228034,
"DuplicateHandle": 0x224048,
"ResumeProcess": 0x224040,
"Shutdown": 0x224044,
"GetSessionId": 0x224050,
}
DEBUG = True
def u32(x): return struct.unpack("<I", x[0:4])[0]
def u64(x): return struct.unpack("<Q", x[0:8])[0]
def p32(x): return struct.pack("<I", x)
def p64(x): return struct.pack("<Q", x)
def ok(x): print(f"[+] {x}")
def dbg(x):
if DEBUG:
windll.kernel32.OutputDebugStringW(f"[+] {x}{os.linesep}")
class const:
GENERIC_READ = 0x80000000
GENERIC_WRITE = 0x40000000
OPEN_EXISTING = 3
DEVICE_PATH = r'''\\.\com_microsoft_idna_ProcLaunchMon'''
def hexdump(source: bytes, length=0x10, base=0):
result = []
for i in range(0, len(source), length):
chunk = bytearray(source[i:i + length])
hexa = " ".join(["%.02x" % c for c in chunk])
text = "".join([chr(b) if 0x20 <= b < 0x7F else "." for b in chunk])
result.append("{addr:#018x} {data:<{dw}} {text}".format(addr=base+i,dw=3*length,data=hexa,text=text))
return os.linesep.join(result)
class DriverSession:
def __init__(self):
DesiredAccess = const.GENERIC_READ | const.GENERIC_WRITE
self.hDevice = windll.kernel32.CreateFileW(const.DEVICE_PATH, DesiredAccess, 0, None, const.OPEN_EXISTING, 0, None)
if self.hDevice == -1:
raise IOError(f"""Cannot get handle to "{const.DEVICE_PATH}" """)
self.Version()
self.GetSessionId()
self.StartEventThread()
def __thread_routine(self):
while self.keep_running:
self.GetNextEvent()
time.sleep(1)
self.DuplicateHandle()
def StartEventThread(self):
self.keep_running = True
self.thread = threading.Thread(target=self.__thread_routine)
self.thread.daemon = True
self.thread.start()
def __del__(self):
ok(f"Stopping thread...")
self.keep_running = False
self.thread.join()
ok(f"Closing device handle...")
windll.kernel32.CloseHandle(self.hDevice)
def __repr__(self) -> str:
hDevice = self.hDevice
return f"DriverSession({hDevice=})"
def DeviceIoctlControl(self, IoctlCode: int, _in:bytes=b'', _out:bytes=b'', *args, **kwargs) -> tuple[bool, bytes]:
dwBytesReturned = c_uint32()
InputBufferSize = kwargs.get('_inlen', len(_in))
OutputBufferSize = kwargs.get('_outlen', len(_out))
InputBuffer = create_string_buffer(InputBufferSize)
OutputBuffer = create_string_buffer(OutputBufferSize)
InputBuffer.value = _in
OutputBuffer.value = _out
isSuccess = windll.kernel32.DeviceIoControl(
self.hDevice, IoctlCode, InputBuffer, InputBufferSize,
OutputBuffer, OutputBufferSize, byref(dwBytesReturned),
None
)
if isSuccess:
dbg(f'DeviceIoControl({IoctlCode=:#x}) = True, {dwBytesReturned=}')
if dwBytesReturned:
dbg(hexdump(OutputBuffer.raw))
return isSuccess, bytearray(OutputBuffer)
return isSuccess, b''
gle = GetLastError()
if gle != 0x32:
reason = FormatError(GetLastError())
dbg(f"DeviceIoControl({IoctlCode=:#x}) = False, {gle=:#x}, {reason=}")
return isSuccess, b''
def Trigger(self, ioctl, input, output):
return self.DeviceIoctlControl(ioctl, bytes(input), bytes(output))
def GetNextEvent(self):
isSuccess, out = self.Trigger(IOCTL_CODES["GetNextEvent"], b'', bytearray([0x00]*0x10))
if isSuccess:
ppid, pid, unk1, unk2 = struct.unpack("<IIII", out)
dbg(f"{pid=:d} {ppid=:d} {unk1=:#x} {unk2=:#x}")
return isSuccess
def ResumeProcess(self):
return self.Trigger(IOCTL_CODES["ResumeProcess"], bytearray([0x00]*0x4), b'')
def Shutdown(self):
return self.Trigger(IOCTL_CODES["Shutdown"], b'', b'')
def DuplicateHandle(self) -> bool:
isSuccess, out = self.Trigger(IOCTL_CODES["DuplicateHandle"], b'', bytearray([0x00]*0x8))
if out:
handle = u64(out)
dbg(f"{handle=:#x}")
return isSuccess
def Version(self):
return self.Trigger(IOCTL_CODES["Version"], b'', bytearray([0x00]*4))
def GetSessionId(self):
return self.Trigger(IOCTL_CODES["GetSessionId"], b'', bytearray([0x00]*0x10))
def AddPid(self, pid: int):
return self.Trigger(IOCTL_CODES["AddPid"], p32(pid), b'')
def RemovePid(self, pid: int):
return self.Trigger(IOCTL_CODES["RemovePid"], p32(pid), bytearray([0x00]*0x4))
sess = DriverSession()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment