-
-
Save AndreLouisCaron/842178ef3c7adc3c6460f4872ea279cf to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import asyncio | |
import os | |
import sys | |
async def echo(reader, writer): | |
data = await reader.read(8 * 1024) | |
while data: | |
writer.write(data) | |
data = await reader.read(8 * 1024) | |
DEFAULT_LIMIT = 2 ** 16 | |
async def start_pipe_server(client_connected_cb, *, path, | |
loop=None, limit=DEFAULT_LIMIT): | |
"""Start listening for connection using Win32 named pipes.""" | |
loop = loop or asyncio.get_event_loop() | |
def factory(): | |
reader = asyncio.StreamReader(limit=limit, loop=loop) | |
protocol = asyncio.StreamReaderProtocol( | |
reader, client_connected_cb, loop=loop | |
) | |
return protocol | |
# NOTE: has no "wait_closed()" coroutine method. | |
server, *_ = await loop.start_serving_pipe(factory, address=path) | |
return server | |
async def open_pipe_connection(path=None, *, loop=None, | |
limit=DEFAULT_LIMIT, **kwds): | |
"""Connect to a server using a Win32 named pipe.""" | |
loop = loop or asyncio.get_event_loop() | |
reader = asyncio.StreamReader(limit=limit, loop=loop) | |
protocol = asyncio.StreamReaderProtocol(reader, loop=loop) | |
transport, _ = await loop.create_pipe_connection( | |
lambda: protocol, path, **kwds | |
) | |
writer = asyncio.StreamWriter(transport, protocol, reader, loop) | |
return reader, writer | |
# Alias UNIX socket / Win32 named pipe functions to platform-agnostic names. | |
if sys.platform == 'win32': | |
start_ipc_server = start_pipe_server | |
open_ipc_connection = open_pipe_connection | |
else: | |
start_ipc_server = asyncio.start_unix_server | |
open_ipc_connection = asyncio.open_unix_connection | |
async def serve_until(cancel, *, path, session, loop=None, ready=None): | |
"""IPC server.""" | |
loop = loop or asyncio.get_event_loop() | |
sessions = set() | |
def client_connected(reader, writer): | |
sessions.add(loop.create_task(session(reader, writer))) | |
# Start accepting connections. | |
print('S: prepping server...') | |
server = await asyncio.wait_for( | |
start_ipc_server(client_connected, path=path), | |
timeout=5.0 | |
) | |
try: | |
# Let the caller know we're ready. | |
print('S: signalling caller...') | |
ready.set_result(None) | |
# Serve clients until we're told otherwise. | |
print('S: serving...') | |
await cancel | |
finally: | |
# Stop accepting connections. | |
print('S: closing...') | |
server.close() | |
if hasattr(server, 'wait_closed'): | |
await server.wait_closed() | |
# Wait for all sessions to complete. | |
print('S: waiting on sessions...') | |
for session in asyncio.as_completed(sessions): | |
await session | |
# Remove the UNIX socket. | |
if sys.platform != 'win32': | |
print('S: unlinking UNIX socket...') | |
os.unlink(path) | |
async def main(): | |
"""Program entry point.""" | |
loop = asyncio.get_event_loop() | |
cancel = asyncio.Future() | |
def ctrl_c(): | |
if not cancel.done(): | |
cancel.set_result(None) | |
# loop.add_signal_handler(signal.SIGINT, ctrl_c) | |
if sys.platform == 'win32': | |
path = r'\\.\pipe\echo' | |
else: | |
path = './echo.sock' | |
# Start accepting connections. | |
ready = asyncio.Future() | |
server = loop.create_task(serve_until( | |
cancel, path=path, session=echo, ready=ready | |
)) | |
try: | |
# Wait until the server is ready. | |
print('C: waiting for server to boot...') | |
await ready | |
# Try connecting to the server. | |
reader, writer = await open_ipc_connection(path=path) | |
try: | |
print('C: sending request...') | |
writer.write(b'hello\n') | |
print('C: waiting for response...') | |
print((await reader.readline()).decode('utf-8').strip()) | |
finally: | |
print('C: closing writer...') | |
writer.close() | |
finally: | |
# Stop accepting connections. | |
print('C: stopping server...') | |
cancel.set_result(None) | |
print('C: waiting for server to shutdown...') | |
await server | |
if __name__ == '__main__': | |
import logging | |
logging.basicConfig(level=logging.DEBUG) | |
if sys.platform == 'win32': | |
asyncio.set_event_loop(asyncio.ProactorEventLoop()) | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thank you for the code. I am trying to use the above code the connect to Named Pipe in Windows and read streaming data from it but I am not able to connect to the Pipe. I am stuck at
S: prepping server...
. Could yo usuggest what is the mistake?