Last active
October 19, 2020 11:12
-
-
Save schlamar/7025156 to your computer and use it in GitHub Desktop.
Call CreateProcessWithLogonW from Python. https://mail.python.org/pipermail/python-win32/2009-June/009192.html
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ctypes | |
from ctypes import POINTER | |
from ctypes.wintypes import HANDLE, LPVOID, WORD, DWORD, BOOL | |
LPSTR = ctypes.c_char_p | |
LPBYTE = LPSTR | |
LPHANDLE = POINTER(HANDLE) | |
LPDWORD = POINTER(DWORD) | |
NULL = 0 | |
TRUE = 1 | |
FALSE = 0 | |
ERROR_BROKEN_PIPE = 109 | |
INVALID_HANDLE_VALUE = -1 | |
HANDLE_FLAG_INHERIT = 0x0001 | |
DUPLICATE_SAME_ACCESS = 0x00000002 | |
INFINITE = 0xFFFFFFFF | |
WAIT_FAILED = 0xFFFFFFFF | |
STARTF_USESTDHANDLES = 0x0100 | |
CREATE_NO_WINDOW = 0x08000000 | |
STARTF_USESHOWWINDOW = 0x00000001 | |
SW_HIDE = 0 | |
class PROCESS_INFORMATION(ctypes.Structure): | |
_pack_ = 1 | |
_fields_ = [ | |
('hProcess', HANDLE), | |
('hThread', HANDLE), | |
('dwProcessId', DWORD), | |
('dwThreadId', DWORD), | |
] | |
class STARTUPINFO(ctypes.Structure): | |
_pack_ = 1 | |
_fields_ = [ | |
('cb', DWORD), | |
('lpReserved', DWORD), # LPSTR | |
('lpDesktop', LPSTR), | |
('lpTitle', LPSTR), | |
('dwX', DWORD), | |
('dwY', DWORD), | |
('dwXSize', DWORD), | |
('dwYSize', DWORD), | |
('dwXCountChars', DWORD), | |
('dwYCountChars', DWORD), | |
('dwFillAttribute', DWORD), | |
('dwFlags', DWORD), | |
('wShowWindow', WORD), | |
('cbReserved2', WORD), | |
('lpReserved2', DWORD), # LPBYTE | |
('hStdInput', DWORD), | |
('hStdOutput', DWORD), | |
('hStdError', DWORD), | |
] | |
class SECURITY_ATTRIBUTES(ctypes.Structure): | |
_fields_ = [("nLength", DWORD), | |
("lpSecurityDescriptor", LPVOID), | |
("bInheritHandle", BOOL)] | |
LPSECURITY_ATTRIBUTES = POINTER(SECURITY_ATTRIBUTES) | |
GetLastError = ctypes.windll.kernel32.GetLastError | |
GetLastError.argtypes = [] | |
GetLastError.restype = DWORD | |
CloseHandle = ctypes.windll.kernel32.CloseHandle | |
CloseHandle.argtypes = [HANDLE] | |
CloseHandle.restype = BOOL | |
CreatePipe = ctypes.windll.kernel32.CreatePipe | |
CreatePipe.argtypes = [POINTER(HANDLE), POINTER(HANDLE), | |
LPSECURITY_ATTRIBUTES, DWORD] | |
CreatePipe.restype = BOOL | |
SetHandleInformation = ctypes.windll.kernel32.SetHandleInformation | |
SetHandleInformation.argtypes = [HANDLE, DWORD, DWORD] | |
SetHandleInformation.restype = BOOL | |
DuplicateHandle = ctypes.windll.kernel32.DuplicateHandle | |
DuplicateHandle.argtypes = [HANDLE, HANDLE, HANDLE, LPHANDLE, | |
DWORD, BOOL, DWORD] | |
DuplicateHandle.restype = BOOL | |
GetCurrentProcess = ctypes.windll.kernel32.GetCurrentProcess | |
GetCurrentProcess.argtypes = [] | |
GetCurrentProcess.restype = HANDLE | |
WaitForSingleObject = ctypes.windll.kernel32.WaitForSingleObject | |
WaitForSingleObject.argtypes = [HANDLE, DWORD] | |
WaitForSingleObject.restype = DWORD | |
ReadFile = ctypes.windll.kernel32.ReadFile | |
ReadFile.argtypes = [HANDLE, LPVOID, DWORD, LPDWORD, LPVOID] | |
ReadFile.restype = BOOL | |
def create_pipe(): | |
saAttr = SECURITY_ATTRIBUTES() | |
saAttr.nLength = ctypes.sizeof(saAttr) | |
saAttr.bInheritHandle = True | |
saAttr.lpSecurityDescriptor = None | |
handles = HANDLE(), HANDLE() | |
if not CreatePipe(ctypes.byref(handles[0]), | |
ctypes.byref(handles[1]), ctypes.byref(saAttr), 0): | |
raise ctypes.WinError() | |
if not SetHandleInformation(handles[0], HANDLE_FLAG_INHERIT, 0): | |
raise ctypes.WinError() | |
return handles[0].value, handles[1].value | |
def read_pipe(handle): | |
result = '' | |
# Allocate the output buffer | |
data = ctypes.create_string_buffer(4096) | |
while True: | |
bytesRead = DWORD(0) | |
if not ReadFile(handle, data, 4096, | |
ctypes.byref(bytesRead), None): | |
le = GetLastError() | |
if le == ERROR_BROKEN_PIPE: | |
break | |
else: | |
raise ctypes.WinError() | |
s = data.value[0:bytesRead.value] | |
result += s.decode('utf_8', 'replace') | |
return result | |
def CreateProcessWithLogonW(lpUsername=None, lpDomain=None, | |
lpPassword=None, dwLogonFlags=0, | |
lpApplicationName=None, lpCommandLine=None, | |
dwCreationFlags=0, lpEnvironment=None, | |
lpCurrentDirectory=None, lpStartupInfo=None): | |
if not lpUsername: | |
lpUsername = NULL | |
else: | |
lpUsername = ctypes.c_wchar_p(lpUsername) | |
if not lpDomain: | |
lpDomain = NULL | |
else: | |
lpDomain = ctypes.c_wchar_p(lpDomain) | |
if not lpPassword: | |
lpPassword = NULL | |
else: | |
lpPassword = ctypes.c_wchar_p(lpPassword) | |
if not lpApplicationName: | |
lpApplicationName = NULL | |
else: | |
lpApplicationName = ctypes.c_wchar_p(lpApplicationName) | |
if not lpCommandLine: | |
lpCommandLine = NULL | |
else: | |
lpCommandLine = ctypes.create_unicode_buffer(lpCommandLine) | |
if not lpEnvironment: | |
lpEnvironment = NULL | |
else: | |
lpEnvironment = ctypes.c_wchar_p(lpEnvironment) | |
if not lpCurrentDirectory: | |
lpCurrentDirectory = NULL | |
else: | |
lpCurrentDirectory = ctypes.c_wchar_p(lpCurrentDirectory) | |
if not lpStartupInfo: | |
p_hstdout, c_hstdout = create_pipe() | |
p_hstderr, c_hstderr = create_pipe() | |
lpStartupInfo = STARTUPINFO() | |
lpStartupInfo.cb = ctypes.sizeof(STARTUPINFO) | |
lpStartupInfo.lpReserved = 0 | |
lpStartupInfo.lpDesktop = 0 | |
lpStartupInfo.lpTitle = 0 | |
lpStartupInfo.dwFlags = STARTF_USESTDHANDLES | STARTF_USESHOWWINDOW | |
lpStartupInfo.cbReserved2 = 0 | |
lpStartupInfo.lpReserved2 = 0 | |
lpStartupInfo.hStdOutput = c_hstdout | |
lpStartupInfo.hStdError = c_hstderr | |
lpStartupInfo.wShowWindow = SW_HIDE | |
lpProcessInformation = PROCESS_INFORMATION() | |
lpProcessInformation.hProcess = INVALID_HANDLE_VALUE | |
lpProcessInformation.hThread = INVALID_HANDLE_VALUE | |
lpProcessInformation.dwProcessId = 0 | |
lpProcessInformation.dwThreadId = 0 | |
if not ctypes.windll.advapi32.CreateProcessWithLogonW( | |
lpUsername, lpDomain, lpPassword, dwLogonFlags, lpApplicationName, | |
ctypes.byref(lpCommandLine), dwCreationFlags, lpEnvironment, | |
lpCurrentDirectory, ctypes.byref(lpStartupInfo), | |
ctypes.byref(lpProcessInformation)): | |
raise ctypes.WinError() | |
res = WaitForSingleObject(lpProcessInformation.hProcess, INFINITE) | |
if res == WAIT_FAILED: | |
raise ctypes.WinError() | |
CloseHandle(c_hstdout) | |
out = read_pipe(p_hstdout) | |
CloseHandle(p_hstdout) | |
CloseHandle(c_hstderr) | |
err = read_pipe(p_hstderr) | |
CloseHandle(p_hstderr) | |
return out, err |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment