Skip to content

Instantly share code, notes, and snippets.

@AndreLouisCaron
Last active March 1, 2024 17:08
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save AndreLouisCaron/842178ef3c7adc3c6460f4872ea279cf to your computer and use it in GitHub Desktop.
Save AndreLouisCaron/842178ef3c7adc3c6460f4872ea279cf to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
import asyncio
import os
import sys
async def echo(reader, writer):
data = await reader.read(8 * 1024)
while data:
writer.write(data)
data = await reader.read(8 * 1024)
DEFAULT_LIMIT = 2 ** 16
async def start_pipe_server(client_connected_cb, *, path,
loop=None, limit=DEFAULT_LIMIT):
"""Start listening for connection using Win32 named pipes."""
loop = loop or asyncio.get_event_loop()
def factory():
reader = asyncio.StreamReader(limit=limit, loop=loop)
protocol = asyncio.StreamReaderProtocol(
reader, client_connected_cb, loop=loop
)
return protocol
# NOTE: has no "wait_closed()" coroutine method.
server, *_ = await loop.start_serving_pipe(factory, address=path)
return server
async def open_pipe_connection(path=None, *, loop=None,
limit=DEFAULT_LIMIT, **kwds):
"""Connect to a server using a Win32 named pipe."""
loop = loop or asyncio.get_event_loop()
reader = asyncio.StreamReader(limit=limit, loop=loop)
protocol = asyncio.StreamReaderProtocol(reader, loop=loop)
transport, _ = await loop.create_pipe_connection(
lambda: protocol, path, **kwds
)
writer = asyncio.StreamWriter(transport, protocol, reader, loop)
return reader, writer
# Alias UNIX socket / Win32 named pipe functions to platform-agnostic names.
if sys.platform == 'win32':
start_ipc_server = start_pipe_server
open_ipc_connection = open_pipe_connection
else:
start_ipc_server = asyncio.start_unix_server
open_ipc_connection = asyncio.open_unix_connection
async def serve_until(cancel, *, path, session, loop=None, ready=None):
"""IPC server."""
loop = loop or asyncio.get_event_loop()
sessions = set()
def client_connected(reader, writer):
sessions.add(loop.create_task(session(reader, writer)))
# Start accepting connections.
print('S: prepping server...')
server = await asyncio.wait_for(
start_ipc_server(client_connected, path=path),
timeout=5.0
)
try:
# Let the caller know we're ready.
print('S: signalling caller...')
ready.set_result(None)
# Serve clients until we're told otherwise.
print('S: serving...')
await cancel
finally:
# Stop accepting connections.
print('S: closing...')
server.close()
if hasattr(server, 'wait_closed'):
await server.wait_closed()
# Wait for all sessions to complete.
print('S: waiting on sessions...')
for session in asyncio.as_completed(sessions):
await session
# Remove the UNIX socket.
if sys.platform != 'win32':
print('S: unlinking UNIX socket...')
os.unlink(path)
async def main():
"""Program entry point."""
loop = asyncio.get_event_loop()
cancel = asyncio.Future()
def ctrl_c():
if not cancel.done():
cancel.set_result(None)
# loop.add_signal_handler(signal.SIGINT, ctrl_c)
if sys.platform == 'win32':
path = r'\\.\pipe\echo'
else:
path = './echo.sock'
# Start accepting connections.
ready = asyncio.Future()
server = loop.create_task(serve_until(
cancel, path=path, session=echo, ready=ready
))
try:
# Wait until the server is ready.
print('C: waiting for server to boot...')
await ready
# Try connecting to the server.
reader, writer = await open_ipc_connection(path=path)
try:
print('C: sending request...')
writer.write(b'hello\n')
print('C: waiting for response...')
print((await reader.readline()).decode('utf-8').strip())
finally:
print('C: closing writer...')
writer.close()
finally:
# Stop accepting connections.
print('C: stopping server...')
cancel.set_result(None)
print('C: waiting for server to shutdown...')
await server
if __name__ == '__main__':
import logging
logging.basicConfig(level=logging.DEBUG)
if sys.platform == 'win32':
asyncio.set_event_loop(asyncio.ProactorEventLoop())
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
@aeroaks
Copy link

aeroaks commented Jan 23, 2018

Thank you for the code. I am trying to use the above code the connect to Named Pipe in Windows and read streaming data from it but I am not able to connect to the Pipe. I am stuck at S: prepping server.... Could yo usuggest what is the mistake?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment