Skip to content

Instantly share code, notes, and snippets.

@schlamar
Last active October 19, 2020 11:12
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save schlamar/7025156 to your computer and use it in GitHub Desktop.
Save schlamar/7025156 to your computer and use it in GitHub Desktop.
import ctypes
from ctypes import POINTER
from ctypes.wintypes import HANDLE, LPVOID, WORD, DWORD, BOOL
LPSTR = ctypes.c_char_p
LPBYTE = LPSTR
LPHANDLE = POINTER(HANDLE)
LPDWORD = POINTER(DWORD)
NULL = 0
TRUE = 1
FALSE = 0
ERROR_BROKEN_PIPE = 109
INVALID_HANDLE_VALUE = -1
HANDLE_FLAG_INHERIT = 0x0001
DUPLICATE_SAME_ACCESS = 0x00000002
INFINITE = 0xFFFFFFFF
WAIT_FAILED = 0xFFFFFFFF
STARTF_USESTDHANDLES = 0x0100
CREATE_NO_WINDOW = 0x08000000
STARTF_USESHOWWINDOW = 0x00000001
SW_HIDE = 0
class PROCESS_INFORMATION(ctypes.Structure):
_pack_ = 1
_fields_ = [
('hProcess', HANDLE),
('hThread', HANDLE),
('dwProcessId', DWORD),
('dwThreadId', DWORD),
]
class STARTUPINFO(ctypes.Structure):
_pack_ = 1
_fields_ = [
('cb', DWORD),
('lpReserved', DWORD), # LPSTR
('lpDesktop', LPSTR),
('lpTitle', LPSTR),
('dwX', DWORD),
('dwY', DWORD),
('dwXSize', DWORD),
('dwYSize', DWORD),
('dwXCountChars', DWORD),
('dwYCountChars', DWORD),
('dwFillAttribute', DWORD),
('dwFlags', DWORD),
('wShowWindow', WORD),
('cbReserved2', WORD),
('lpReserved2', DWORD), # LPBYTE
('hStdInput', DWORD),
('hStdOutput', DWORD),
('hStdError', DWORD),
]
class SECURITY_ATTRIBUTES(ctypes.Structure):
_fields_ = [("nLength", DWORD),
("lpSecurityDescriptor", LPVOID),
("bInheritHandle", BOOL)]
LPSECURITY_ATTRIBUTES = POINTER(SECURITY_ATTRIBUTES)
GetLastError = ctypes.windll.kernel32.GetLastError
GetLastError.argtypes = []
GetLastError.restype = DWORD
CloseHandle = ctypes.windll.kernel32.CloseHandle
CloseHandle.argtypes = [HANDLE]
CloseHandle.restype = BOOL
CreatePipe = ctypes.windll.kernel32.CreatePipe
CreatePipe.argtypes = [POINTER(HANDLE), POINTER(HANDLE),
LPSECURITY_ATTRIBUTES, DWORD]
CreatePipe.restype = BOOL
SetHandleInformation = ctypes.windll.kernel32.SetHandleInformation
SetHandleInformation.argtypes = [HANDLE, DWORD, DWORD]
SetHandleInformation.restype = BOOL
DuplicateHandle = ctypes.windll.kernel32.DuplicateHandle
DuplicateHandle.argtypes = [HANDLE, HANDLE, HANDLE, LPHANDLE,
DWORD, BOOL, DWORD]
DuplicateHandle.restype = BOOL
GetCurrentProcess = ctypes.windll.kernel32.GetCurrentProcess
GetCurrentProcess.argtypes = []
GetCurrentProcess.restype = HANDLE
WaitForSingleObject = ctypes.windll.kernel32.WaitForSingleObject
WaitForSingleObject.argtypes = [HANDLE, DWORD]
WaitForSingleObject.restype = DWORD
ReadFile = ctypes.windll.kernel32.ReadFile
ReadFile.argtypes = [HANDLE, LPVOID, DWORD, LPDWORD, LPVOID]
ReadFile.restype = BOOL
def create_pipe():
saAttr = SECURITY_ATTRIBUTES()
saAttr.nLength = ctypes.sizeof(saAttr)
saAttr.bInheritHandle = True
saAttr.lpSecurityDescriptor = None
handles = HANDLE(), HANDLE()
if not CreatePipe(ctypes.byref(handles[0]),
ctypes.byref(handles[1]), ctypes.byref(saAttr), 0):
raise ctypes.WinError()
if not SetHandleInformation(handles[0], HANDLE_FLAG_INHERIT, 0):
raise ctypes.WinError()
return handles[0].value, handles[1].value
def read_pipe(handle):
result = ''
# Allocate the output buffer
data = ctypes.create_string_buffer(4096)
while True:
bytesRead = DWORD(0)
if not ReadFile(handle, data, 4096,
ctypes.byref(bytesRead), None):
le = GetLastError()
if le == ERROR_BROKEN_PIPE:
break
else:
raise ctypes.WinError()
s = data.value[0:bytesRead.value]
result += s.decode('utf_8', 'replace')
return result
def CreateProcessWithLogonW(lpUsername=None, lpDomain=None,
lpPassword=None, dwLogonFlags=0,
lpApplicationName=None, lpCommandLine=None,
dwCreationFlags=0, lpEnvironment=None,
lpCurrentDirectory=None, lpStartupInfo=None):
if not lpUsername:
lpUsername = NULL
else:
lpUsername = ctypes.c_wchar_p(lpUsername)
if not lpDomain:
lpDomain = NULL
else:
lpDomain = ctypes.c_wchar_p(lpDomain)
if not lpPassword:
lpPassword = NULL
else:
lpPassword = ctypes.c_wchar_p(lpPassword)
if not lpApplicationName:
lpApplicationName = NULL
else:
lpApplicationName = ctypes.c_wchar_p(lpApplicationName)
if not lpCommandLine:
lpCommandLine = NULL
else:
lpCommandLine = ctypes.create_unicode_buffer(lpCommandLine)
if not lpEnvironment:
lpEnvironment = NULL
else:
lpEnvironment = ctypes.c_wchar_p(lpEnvironment)
if not lpCurrentDirectory:
lpCurrentDirectory = NULL
else:
lpCurrentDirectory = ctypes.c_wchar_p(lpCurrentDirectory)
if not lpStartupInfo:
p_hstdout, c_hstdout = create_pipe()
p_hstderr, c_hstderr = create_pipe()
lpStartupInfo = STARTUPINFO()
lpStartupInfo.cb = ctypes.sizeof(STARTUPINFO)
lpStartupInfo.lpReserved = 0
lpStartupInfo.lpDesktop = 0
lpStartupInfo.lpTitle = 0
lpStartupInfo.dwFlags = STARTF_USESTDHANDLES | STARTF_USESHOWWINDOW
lpStartupInfo.cbReserved2 = 0
lpStartupInfo.lpReserved2 = 0
lpStartupInfo.hStdOutput = c_hstdout
lpStartupInfo.hStdError = c_hstderr
lpStartupInfo.wShowWindow = SW_HIDE
lpProcessInformation = PROCESS_INFORMATION()
lpProcessInformation.hProcess = INVALID_HANDLE_VALUE
lpProcessInformation.hThread = INVALID_HANDLE_VALUE
lpProcessInformation.dwProcessId = 0
lpProcessInformation.dwThreadId = 0
if not ctypes.windll.advapi32.CreateProcessWithLogonW(
lpUsername, lpDomain, lpPassword, dwLogonFlags, lpApplicationName,
ctypes.byref(lpCommandLine), dwCreationFlags, lpEnvironment,
lpCurrentDirectory, ctypes.byref(lpStartupInfo),
ctypes.byref(lpProcessInformation)):
raise ctypes.WinError()
res = WaitForSingleObject(lpProcessInformation.hProcess, INFINITE)
if res == WAIT_FAILED:
raise ctypes.WinError()
CloseHandle(c_hstdout)
out = read_pipe(p_hstdout)
CloseHandle(p_hstdout)
CloseHandle(c_hstderr)
err = read_pipe(p_hstderr)
CloseHandle(p_hstderr)
return out, err
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment