Skip to content

Instantly share code, notes, and snippets.

@AndreLouisCaron
Created April 21, 2018 00:28
Show Gist options
  • Save AndreLouisCaron/39ecbab96b731c5bafb2508c2bdd1fd0 to your computer and use it in GitHub Desktop.
Save AndreLouisCaron/39ecbab96b731c5bafb2508c2bdd1fd0 to your computer and use it in GitHub Desktop.
Demonstration of race condition in call to `WaitNamedPipe()` which triggers `ERROR_FILE_NOT_FOUND`.
# -*- coding: utf-8 -*-
import logging
import os
import time
from binascii import hexlify
from contextlib import (
contextmanager,
ExitStack,
)
from ctypes import (
byref,
create_string_buffer,
windll,
WINFUNCTYPE,
)
from ctypes.wintypes import (
BOOL,
DWORD,
HANDLE,
LPCWSTR,
LPDWORD,
LPHANDLE,
LPVOID,
)
from threading import (
Event,
Thread,
)
from timeit import default_timer as clock
from queue import Queue
class Disconnected(Exception):
pass
ERROR_FILE_NOT_FOUND = 2
ERROR_BROKEN_PIPE = 109
ERROR_SEM_TIMEOUT = 121
ERROR_PIPE_BUSY = 231
ERROR_NO_DATA = 232
ERROR_PIPE_CONNECTED = 535
PIPE_UNLIMITED_INSTANCES = 255
DUPLICATE_SAME_ACCESS = 2
def syscall(name, restype, *argtypes):
prototype = WINFUNCTYPE(restype, *argtypes)
return getattr(windll.kernel32, name)
GetLastError = syscall('GetLastError', DWORD)
CloseHandle = syscall('CloseHandle', BOOL, HANDLE)
GetCurrentProcessId = syscall('GetCurrentProcessId', DWORD)
OpenProcess = syscall('OpenProcess', HANDLE, DWORD, BOOL, DWORD)
DuplicateHandle = syscall(
'DuplicateHandle',
HANDLE,
HANDLE,
HANDLE,
LPHANDLE,
DWORD,
BOOL,
DWORD,
)
CreateNamedPipe = syscall(
'CreateNamedPipeW',
HANDLE,
LPCWSTR,
DWORD,
DWORD,
DWORD,
DWORD,
DWORD,
DWORD,
LPVOID,
)
ConnectNamedPipe = syscall(
'ConnectNamedPipe',
HANDLE,
LPVOID,
)
CreateFile = syscall(
'CreateFileW',
HANDLE,
LPCWSTR,
DWORD,
DWORD,
LPVOID,
DWORD,
DWORD,
HANDLE,
)
WaitNamedPipe = syscall('WaitNamedPipeW', BOOL, LPCWSTR, DWORD)
WriteFile = syscall('WriteFile', BOOL, HANDLE, LPVOID, DWORD, LPDWORD, LPVOID)
ReadFile = syscall('ReadFile', BOOL, HANDLE, LPVOID, DWORD, LPDWORD, LPVOID)
@contextmanager
def closing(handle):
"""Automatically close the handle for a sytem resource."""
try:
yield handle
finally:
CloseHandle(handle)
@contextmanager
def server_pipe(path, max_instances=PIPE_UNLIMITED_INSTANCES):
"""Automatically bind & close a named pipe (server)."""
handle = CreateNamedPipe(
path,
0x00000003, # PIPE_ACCESS_DUPLEX
0x00000008, # PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT | PIPE_REJECT_REMOTE_CLIENTS
max_instances,
0, # output buffer size
0, # input buffer size
5 * 1000, # default time out (ms)
None, # security attributes
)
if handle == -1:
error = GetLastError()
raise WindowsError(error)
with closing(handle):
status = ConnectNamedPipe(handle, None)
if status == 0:
error = GetLastError()
if error != ERROR_PIPE_CONNECTED:
raise WindowsError(error)
yield handle
def dup_handle(process, src):
"""Clone a pipe handle (eases error handling on transfer of ownership)."""
dst = HANDLE()
status = DuplicateHandle(
process,
src,
process,
byref(dst),
0,
False,
DUPLICATE_SAME_ACCESS,
)
if status == 0:
error = GetLastError()
raise WindowsError(error)
return dst.value
@contextmanager
def current_process():
"""Get a handle to the current process (to duplicate handles)."""
handle = OpenProcess(0x0040, False, GetCurrentProcessId())
with closing(handle):
yield handle
def listen(e, path, q):
"""Listen for incoming connections..."""
with current_process() as process:
while not e.wait(0.001):
with server_pipe(path) as pipe:
q.put(dup_handle(process, pipe))
@contextmanager
def loop_concurrently(name, fn, *args):
"""Spawn a thread that loops in the background."""
r = Event()
e = Event()
def entry_point():
try:
r.set()
fn(e, *args)
except Exception:
logging.exception('In background loop.')
thread = Thread(target=entry_point)
thread.start()
r.wait()
try:
yield
except KeyboardInterrupt:
pass
finally:
e.set()
print('Waiting for thread "%s".' % (
name,
))
thread.join()
@contextmanager
def client_pipe(path, timeout=5.0):
"""Automatically bind & close a named pipe (server)."""
ref = clock()
while True:
elapsed = clock() - ref
if elapsed >= timeout:
raise Exception('Connection time out!')
handle = CreateFile(
path,
0x80000000 | 0x40000000, # GENERIC_READ | GENERIC_WRITE
0, # do not share
None,
3, # OPEN_EXISTING
0, # no attributes
None,
)
if handle != -1:
break
error = GetLastError()
if error == ERROR_FILE_NOT_FOUND:
#print('CreateFile: race!')
# Race condition in the server loop: we tried to connect
# between calls to `ConnectNamedPipe()`.
time.sleep(0.005) # 5ms
continue
if error != ERROR_PIPE_BUSY:
raise WindowsError(error)
status = WaitNamedPipe(
path,
int(1000.0 * max(0.0, timeout - elapsed))
)
if status != 0:
continue
error = GetLastError()
if error == ERROR_FILE_NOT_FOUND:
# Race condition in the server loop: we tried to wait
# between calls to `ConnectNamedPipe()`.
print('WaitNamedPipe: race!')
time.sleep(0.001)
continue
if error == ERROR_SEM_TIMEOUT:
raise Exception('Connection time out!')
raise WindowsError(error)
with closing(handle):
yield handle
def send(pipe, data):
"""Send data over a pipe."""
size = len(data)
used = 0
while used < size:
xfer = DWORD()
status = WriteFile(
pipe,
data[used:],
size - used,
byref(xfer),
None,
)
if status == 0:
error = GetLastError()
if error in (ERROR_BROKEN_PIPE, ERROR_NO_DATA):
raise Disconnected()
raise WindowsError(error)
used += xfer.value
def recv(pipe, size):
"""Receive data over a pipe."""
data = b''
while len(data) < size:
data += _recv(pipe, size - len(data))
return data
def _recv(pipe, size):
"""Receive data over a pipe."""
data = create_string_buffer(size)
xfer = DWORD()
status = ReadFile(
pipe,
data,
size,
byref(xfer),
None,
)
if status == 0:
error = GetLastError()
if error == ERROR_BROKEN_PIPE:
raise Disconnected()
raise WindowsError(error)
return data.raw[:xfer.value]
def pound_on_pipe_server(e, path):
"""Repeatedly connect, execute a request-reply cycle and disconnect."""
while not e.wait(0.001):
with client_pipe(path) as pipe:
try:
req = os.urandom(32)
send(pipe, req)
rep = recv(pipe, 32)
assert rep == req
except Disconnected:
pass
finally:
pass
def respond_to_clients(e, q):
while not e.wait(0.001):
with closing(q.get()) as pipe:
try:
data = recv(pipe, 32)
#print('S: ECHO "%s"' % (
# hexlify(data).decode('ascii'),
#))
send(pipe, data)
except Disconnected:
pass
finally:
pass
def main():
#path = r'\\.\pipe\bug'
path = r'//./pipe/bug'
clients = 10
workers = 2
timeout = 30.0
with ExitStack() as stack:
# Start the server.
q = Queue()
stack.enter_context(loop_concurrently(
'listener', listen, path, q,
))
for i in range(workers):
stack.enter_context(loop_concurrently(
'worker-%03d' % i,
respond_to_clients, q,
))
# Run a bunch of clients.
for i in range(clients):
stack.enter_context(loop_concurrently(
'client-%03d' % i,
pound_on_pipe_server, path,
))
# Let this soak for a while.
time.sleep(timeout)
print('S: CLOSING')
if __name__ == '__main__':
logging.basicConfig(
level=logging.INFO,
)
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment