Skip to content

Instantly share code, notes, and snippets.

@aont
Created March 19, 2020 05:37
Show Gist options
  • Save aont/5066a47641ebb250f44ae4205d313a31 to your computer and use it in GitHub Desktop.
Save aont/5066a47641ebb250f44ae4205d313a31 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import sys
import msvcrt
import os
import ctypes
import shutil
PIPE_ACCESS_INBOUND = 0x00000001
PIPE_ACCESS_OUTBOUND = 0x00000002
PIPE_TYPE_BYTE = 0x00000000
PIPE_WAIT = 0x00000000
kernel32 = ctypes.WinDLL("kernel32")
handle_t = ctypes.c_void_p
LPSECURITY_ATTRIBUTES = ctypes.c_void_p
INVALID_HANDLE_VALUE = handle_t(-1).value
kernel32.CreateNamedPipeW.restype = handle_t
kernel32.CreateNamedPipeW.argtypes = (ctypes.c_wchar_p, ctypes.c_int32, ctypes.c_int32, ctypes.c_int32, ctypes.c_int32, ctypes.c_int32, ctypes.c_int32, LPSECURITY_ATTRIBUTES)
LPOVERLAPPED = ctypes.c_void_p
kernel32.ConnectNamedPipe.restype = ctypes.c_bool
kernel32.ConnectNamedPipe.argtypes = (handle_t, LPOVERLAPPED,)
kernel32.DisconnectNamedPipe.restype = ctypes.c_bool
kernel32.DisconnectNamedPipe.argtypes = (handle_t,)
kernel32.CloseHandle.restype = ctypes.c_bool
kernel32.CloseHandle.argtypes = (handle_t,)
class NamedPipeWriter:
def __init__(self, pipe_path):
pipe_access = PIPE_ACCESS_OUTBOUND
self.hPipe = kernel32.CreateNamedPipeW(pipe_path, pipe_access, PIPE_TYPE_BYTE | PIPE_WAIT, 1, 0, 0, 100, None)
if self.hPipe == INVALID_HANDLE_VALUE:
raise Exception("CreateNamedPipeW")
self.fd_pipe = msvcrt.open_osfhandle(self.hPipe, 0)
self.f_pipe = os.fdopen(self.fd_pipe, "wb")
def get_file(self):
return self.f_pipe
def connect(self):
return kernel32.ConnectNamedPipe(self.hPipe, None)
def close(self):
self.f_pipe.close()
if not kernel32.DisconnectNamedPipe(self.hPipe):
raise Exception("DisconnectNamedPipe")
if not kernel32.CloseHandle(self.hPipe):
raise Exception("CloseHandle")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment