Skip to content

Instantly share code, notes, and snippets.

@ethanhs
Last active October 24, 2018 19:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ethanhs/c88752c7b1605fe04596c1c4848c88f3 to your computer and use it in GitHub Desktop.
Save ethanhs/c88752c7b1605fe04596c1c4848c88f3 to your computer and use it in GitHub Desktop.
An experiment writing nice wrappers around namedpipes using the _winapi module.
import time
import sys
from namedpipe import *
if __name__ == '__main__':
if len(sys.argv) < 2:
print("need s or c as argument")
elif sys.argv[1] == "s":
with NamedPipeServer('testFoo') as server:
count = 10
while count > 0:
server.write(str(count))
time.sleep(1)
count -= 1
elif sys.argv[1] == "c":
with NamedPipeClient('testFoo') as client:
while True:
client.read()
else:
print(f"no can do: {sys.argv[1]}")
"""A clean wrapper around Windows APIs allowing interprocess communication over named pipes
Here is a clear, simple explanation of the incantation needed to do this correctly:
* From the client, spawn the server.
* The server should CreateNamedPipe, then ConnectNamedPipe
* The client must poll for the pipe, then open with CreateFile
* Then each can read/write from the pipe as needed.
* Finally, the client should close the pipe.
"""
import io
import time
from types import TracebackType
from typing import Optional, Type
import _winapi
import time
from contextlib import ContextDecorator
class NamedPipeBase(io.StringIO):
handle = _winapi.NULL
buffer_size = 2**16
def __init__(self, name: str) -> None:
self.name = r'\\.\pipe\{}'.format(name)
def read(self, size: Optional[int] = None) -> str:
if self.handle != _winapi.NULL:
msg, status = _winapi.ReadFile(self.handle, size if size else self.buffer_size)
return msg.decode()
else:
raise WindowsError("Cannot read pipe")
def write(self, msg: str) -> int:
if self.handle != _winapi.NULL:
return _winapi.WriteFile(self.handle, msg.encode(), len(msg))
else:
raise WindowsError("Cannot read pipe")
def __exit__(self,
exc_ty: Optional[Type[BaseException]] = None,
exc_val: Optional[BaseException] = None,
exc_tb: Optional[TracebackType] = None,
) -> bool:
if self.handle != _winapi.NULL:
_winapi.CloseHandle(self.handle)
return False
class NamedPipeServer(NamedPipeBase):
def __init__(self, name: str) -> None:
super().__init__(name)
self.handle = _winapi.CreateNamedPipe(
self.name,
_winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE,
_winapi.PIPE_READMODE_MESSAGE | _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_WAIT,
1, # one instance
self.buffer_size,
self.buffer_size,
0,
_winapi.NULL,
)
if _winapi.GetLastError() != 0:
err = _winapi.GetLastError()
raise WindowsError(f'Error creating pipe: {err}')
def __enter__(self) -> 'NamedPipeServer':
_winapi.ConnectNamedPipe(self.handle, _winapi.NULL)
return self
class NamedPipeClient(NamedPipeBase):
def __init__(self, name: str) -> None:
super().__init__(name)
# Sadly we need to try several times until this works, or it times out
# thanks Windows
done = False
while not done:
try:
self.handle = _winapi.CreateFile(
self.name,
_winapi.GENERIC_READ | _winapi.GENERIC_WRITE,
0,
_winapi.NULL,
_winapi.OPEN_EXISTING,
0,
_winapi.NULL,
)
_winapi.SetNamedPipeHandleState(self.handle,
_winapi.PIPE_READMODE_MESSAGE,
_winapi.NULL,
_winapi.NULL)
except FileNotFoundError:
raise WindowsError(
"Unable to open connection to pipe at {}".format(self.name)
)
except WindowsError as e:
if e.winerror not in (_winapi.ERROR_SEM_TIMEOUT, _winapi.ERROR_PIPE_BUSY):
break
else:
time.sleep(1)
break
def __enter__(self) -> 'NamedPipeClient':
return self
from namedpipe import NamedPipeServer
from shared import PIPE_NAME
import time
import sys
with NamedPipeServer(PIPE_NAME) as p:
time.sleep(1)
p.write('Hello from the server side.')
PIPE_NAME = 'mypy_pipe'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment