Created
April 21, 2018 00:28
-
-
Save AndreLouisCaron/39ecbab96b731c5bafb2508c2bdd1fd0 to your computer and use it in GitHub Desktop.
Demonstration of race condition in call to `WaitNamedPipe()` which triggers `ERROR_FILE_NOT_FOUND`.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import logging | |
import os | |
import time | |
from binascii import hexlify | |
from contextlib import ( | |
contextmanager, | |
ExitStack, | |
) | |
from ctypes import ( | |
byref, | |
create_string_buffer, | |
windll, | |
WINFUNCTYPE, | |
) | |
from ctypes.wintypes import ( | |
BOOL, | |
DWORD, | |
HANDLE, | |
LPCWSTR, | |
LPDWORD, | |
LPHANDLE, | |
LPVOID, | |
) | |
from threading import ( | |
Event, | |
Thread, | |
) | |
from timeit import default_timer as clock | |
from queue import Queue | |
class Disconnected(Exception): | |
pass | |
ERROR_FILE_NOT_FOUND = 2 | |
ERROR_BROKEN_PIPE = 109 | |
ERROR_SEM_TIMEOUT = 121 | |
ERROR_PIPE_BUSY = 231 | |
ERROR_NO_DATA = 232 | |
ERROR_PIPE_CONNECTED = 535 | |
PIPE_UNLIMITED_INSTANCES = 255 | |
DUPLICATE_SAME_ACCESS = 2 | |
def syscall(name, restype, *argtypes): | |
prototype = WINFUNCTYPE(restype, *argtypes) | |
return getattr(windll.kernel32, name) | |
GetLastError = syscall('GetLastError', DWORD) | |
CloseHandle = syscall('CloseHandle', BOOL, HANDLE) | |
GetCurrentProcessId = syscall('GetCurrentProcessId', DWORD) | |
OpenProcess = syscall('OpenProcess', HANDLE, DWORD, BOOL, DWORD) | |
DuplicateHandle = syscall( | |
'DuplicateHandle', | |
HANDLE, | |
HANDLE, | |
HANDLE, | |
LPHANDLE, | |
DWORD, | |
BOOL, | |
DWORD, | |
) | |
CreateNamedPipe = syscall( | |
'CreateNamedPipeW', | |
HANDLE, | |
LPCWSTR, | |
DWORD, | |
DWORD, | |
DWORD, | |
DWORD, | |
DWORD, | |
DWORD, | |
LPVOID, | |
) | |
ConnectNamedPipe = syscall( | |
'ConnectNamedPipe', | |
HANDLE, | |
LPVOID, | |
) | |
CreateFile = syscall( | |
'CreateFileW', | |
HANDLE, | |
LPCWSTR, | |
DWORD, | |
DWORD, | |
LPVOID, | |
DWORD, | |
DWORD, | |
HANDLE, | |
) | |
WaitNamedPipe = syscall('WaitNamedPipeW', BOOL, LPCWSTR, DWORD) | |
WriteFile = syscall('WriteFile', BOOL, HANDLE, LPVOID, DWORD, LPDWORD, LPVOID) | |
ReadFile = syscall('ReadFile', BOOL, HANDLE, LPVOID, DWORD, LPDWORD, LPVOID) | |
@contextmanager | |
def closing(handle): | |
"""Automatically close the handle for a sytem resource.""" | |
try: | |
yield handle | |
finally: | |
CloseHandle(handle) | |
@contextmanager | |
def server_pipe(path, max_instances=PIPE_UNLIMITED_INSTANCES): | |
"""Automatically bind & close a named pipe (server).""" | |
handle = CreateNamedPipe( | |
path, | |
0x00000003, # PIPE_ACCESS_DUPLEX | |
0x00000008, # PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT | PIPE_REJECT_REMOTE_CLIENTS | |
max_instances, | |
0, # output buffer size | |
0, # input buffer size | |
5 * 1000, # default time out (ms) | |
None, # security attributes | |
) | |
if handle == -1: | |
error = GetLastError() | |
raise WindowsError(error) | |
with closing(handle): | |
status = ConnectNamedPipe(handle, None) | |
if status == 0: | |
error = GetLastError() | |
if error != ERROR_PIPE_CONNECTED: | |
raise WindowsError(error) | |
yield handle | |
def dup_handle(process, src): | |
"""Clone a pipe handle (eases error handling on transfer of ownership).""" | |
dst = HANDLE() | |
status = DuplicateHandle( | |
process, | |
src, | |
process, | |
byref(dst), | |
0, | |
False, | |
DUPLICATE_SAME_ACCESS, | |
) | |
if status == 0: | |
error = GetLastError() | |
raise WindowsError(error) | |
return dst.value | |
@contextmanager | |
def current_process(): | |
"""Get a handle to the current process (to duplicate handles).""" | |
handle = OpenProcess(0x0040, False, GetCurrentProcessId()) | |
with closing(handle): | |
yield handle | |
def listen(e, path, q): | |
"""Listen for incoming connections...""" | |
with current_process() as process: | |
while not e.wait(0.001): | |
with server_pipe(path) as pipe: | |
q.put(dup_handle(process, pipe)) | |
@contextmanager | |
def loop_concurrently(name, fn, *args): | |
"""Spawn a thread that loops in the background.""" | |
r = Event() | |
e = Event() | |
def entry_point(): | |
try: | |
r.set() | |
fn(e, *args) | |
except Exception: | |
logging.exception('In background loop.') | |
thread = Thread(target=entry_point) | |
thread.start() | |
r.wait() | |
try: | |
yield | |
except KeyboardInterrupt: | |
pass | |
finally: | |
e.set() | |
print('Waiting for thread "%s".' % ( | |
name, | |
)) | |
thread.join() | |
@contextmanager | |
def client_pipe(path, timeout=5.0): | |
"""Automatically bind & close a named pipe (server).""" | |
ref = clock() | |
while True: | |
elapsed = clock() - ref | |
if elapsed >= timeout: | |
raise Exception('Connection time out!') | |
handle = CreateFile( | |
path, | |
0x80000000 | 0x40000000, # GENERIC_READ | GENERIC_WRITE | |
0, # do not share | |
None, | |
3, # OPEN_EXISTING | |
0, # no attributes | |
None, | |
) | |
if handle != -1: | |
break | |
error = GetLastError() | |
if error == ERROR_FILE_NOT_FOUND: | |
#print('CreateFile: race!') | |
# Race condition in the server loop: we tried to connect | |
# between calls to `ConnectNamedPipe()`. | |
time.sleep(0.005) # 5ms | |
continue | |
if error != ERROR_PIPE_BUSY: | |
raise WindowsError(error) | |
status = WaitNamedPipe( | |
path, | |
int(1000.0 * max(0.0, timeout - elapsed)) | |
) | |
if status != 0: | |
continue | |
error = GetLastError() | |
if error == ERROR_FILE_NOT_FOUND: | |
# Race condition in the server loop: we tried to wait | |
# between calls to `ConnectNamedPipe()`. | |
print('WaitNamedPipe: race!') | |
time.sleep(0.001) | |
continue | |
if error == ERROR_SEM_TIMEOUT: | |
raise Exception('Connection time out!') | |
raise WindowsError(error) | |
with closing(handle): | |
yield handle | |
def send(pipe, data): | |
"""Send data over a pipe.""" | |
size = len(data) | |
used = 0 | |
while used < size: | |
xfer = DWORD() | |
status = WriteFile( | |
pipe, | |
data[used:], | |
size - used, | |
byref(xfer), | |
None, | |
) | |
if status == 0: | |
error = GetLastError() | |
if error in (ERROR_BROKEN_PIPE, ERROR_NO_DATA): | |
raise Disconnected() | |
raise WindowsError(error) | |
used += xfer.value | |
def recv(pipe, size): | |
"""Receive data over a pipe.""" | |
data = b'' | |
while len(data) < size: | |
data += _recv(pipe, size - len(data)) | |
return data | |
def _recv(pipe, size): | |
"""Receive data over a pipe.""" | |
data = create_string_buffer(size) | |
xfer = DWORD() | |
status = ReadFile( | |
pipe, | |
data, | |
size, | |
byref(xfer), | |
None, | |
) | |
if status == 0: | |
error = GetLastError() | |
if error == ERROR_BROKEN_PIPE: | |
raise Disconnected() | |
raise WindowsError(error) | |
return data.raw[:xfer.value] | |
def pound_on_pipe_server(e, path): | |
"""Repeatedly connect, execute a request-reply cycle and disconnect.""" | |
while not e.wait(0.001): | |
with client_pipe(path) as pipe: | |
try: | |
req = os.urandom(32) | |
send(pipe, req) | |
rep = recv(pipe, 32) | |
assert rep == req | |
except Disconnected: | |
pass | |
finally: | |
pass | |
def respond_to_clients(e, q): | |
while not e.wait(0.001): | |
with closing(q.get()) as pipe: | |
try: | |
data = recv(pipe, 32) | |
#print('S: ECHO "%s"' % ( | |
# hexlify(data).decode('ascii'), | |
#)) | |
send(pipe, data) | |
except Disconnected: | |
pass | |
finally: | |
pass | |
def main(): | |
#path = r'\\.\pipe\bug' | |
path = r'//./pipe/bug' | |
clients = 10 | |
workers = 2 | |
timeout = 30.0 | |
with ExitStack() as stack: | |
# Start the server. | |
q = Queue() | |
stack.enter_context(loop_concurrently( | |
'listener', listen, path, q, | |
)) | |
for i in range(workers): | |
stack.enter_context(loop_concurrently( | |
'worker-%03d' % i, | |
respond_to_clients, q, | |
)) | |
# Run a bunch of clients. | |
for i in range(clients): | |
stack.enter_context(loop_concurrently( | |
'client-%03d' % i, | |
pound_on_pipe_server, path, | |
)) | |
# Let this soak for a while. | |
time.sleep(timeout) | |
print('S: CLOSING') | |
if __name__ == '__main__': | |
logging.basicConfig( | |
level=logging.INFO, | |
) | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment