Skip to content

Instantly share code, notes, and snippets.

@njsmith
Created December 18, 2018 06:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save njsmith/4af6b1be0a793592fafcabf00991ce73 to your computer and use it in GitHub Desktop.
Save njsmith/4af6b1be0a793592fafcabf00991ce73 to your computer and use it in GitHub Desktop.
# This ignores some issues that a real production-quality version should
# address; see https://github.com/python-trio/trio/issues/279 for
# the full details. But it should handle simple cases OK.
async def open_unix_domain_listeners(path, *, permissions=None):
sock = trio.socket.socket(trio.socket.AF_UNIX, trio.socket.SOCK_STREAM)
try:
os.unlink(path)
except OSError:
pass
await sock.bind(path)
if permissions is not None:
os.fchmod(sock.fileno(), permissions)
sock.listen(100)
return [trio.SocketListener(sock)]
async def serve_unix_domain(
handler,
path,
*,
permissions=None,
handler_nursery=None,
task_status=trio.TASK_STATUS_IGNORED
):
listeners = await open_unix_domain_listeners(path, permissions=permissions)
await trio.serve_listeners(
handler,
listeners,
handler_nursery=handler_nursery,
task_status=task_status
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment