Skip to content

Instantly share code, notes, and snippets.

@willcl-ark
Created November 27, 2019 12:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save willcl-ark/730fbb96f25422c38729e6eee83e0a1e to your computer and use it in GitHub Desktop.
Save willcl-ark/730fbb96f25422c38729e6eee83e0a1e to your computer and use it in GitHub Desktop.
"""lightning mesh proxy.
Usage:
proxy.py MY_PUBKEY REMOTE_PUBKEY REMOTE_GID
Arguments:
MY_PUBKEY my node pubkey
REMOTE_PUBKEY the pubkey of the remote node
REMOTE_GID the GID of the remote node
"""
from docopt import docopt
import logging
import socket
import struct
import time
import trio
import lnproxy.ln_msg as ln_msg
logger = logging.getLogger(f"{'PROXY':<6s}")
MSG_LEN: int = 2
MSG_LEN_MAC: int = 16
MSG_HEADER: int = MSG_LEN + MSG_LEN_MAC
MSG_MAC: int = 16
# set up memory channels between server and mesh connection
# shared between all socket connections
send_to_mesh, receive_from_server = trio.open_memory_channel(50)
send_to_server, receive_from_mesh = trio.open_memory_channel(50)
async def unlink_socket(sock):
"""Unlink a Unix Socket
"""
socket_path = trio.Path(sock)
try:
await socket_path.unlink()
except OSError:
if await socket_path.exists():
raise
async def receive_exactly(stream, length, timeout=500):
res = b""
end = time.time() + timeout
while len(res) < length and time.time() < end:
await stream.receive_some(length - len(res))
if len(res) == length:
return res
else:
raise TimeoutError("Didn't receive enough bytes within the timeout, discarding")
async def proxy(read_stream, write_stream):
"""Proxy message traffic from one stream to another.
Handle and parse certain lightning messages.
"""
while True:
try:
# receive and unpack the header
header = await receive_exactly(read_stream, MSG_HEADER)
body_len = struct.unpack(">H", header[:MSG_LEN])[0]
body_len_mac = struct.unpack("16s", header[-16:])[0]
# receive the body
body = await receive_exactly(read_stream, body_len)
# parse the message within and modify header and body if necessary
header, body = ln_msg.parse(header, body)
# receive the body mac
body_mac = await receive_exactly(read_stream, MSG_MAC)
# send full message to remote
await write_stream.send_all(header + body + body_mac)
except TimeoutError:
break
except Exception:
logger.exception("Exception inside proxy.proxy")
break
async def socket_handler_local(stream):
"""Handles a listening socket.
Passes the stream to proxy() which will proxy it with a mesh memory_channel
:arg stream: a trio.SocketStream for the listening socket
"""
try:
async with trio.open_nursery() as nursery:
nursery.start_soon(proxy, stream, send_to_mesh)
nursery.start_soon(proxy, receive_from_mesh, stream)
except Exception:
logger.exception(f"Local socket_handler: crashed")
finally:
await stream.aclose()
async def serve_unix_socket(socket_address):
"""Serve a listening unix socket on 'socket_address'
Handles the socket using socket_handler_local
"""
await unlink_socket(socket_address)
listeners = []
# Create the listening socket, bind to it and listen
sock = trio.socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
await sock.bind(socket_address)
sock.listen()
listeners.append(trio.SocketListener(sock))
try:
# Manage the listening with the handler
await trio.serve_listeners(socket_handler_local, listeners)
except Exception:
logger.exception("serve_unix_socket: crashed")
finally:
# unlink the socket if it gets closed
await unlink_socket(socket_address)
def main():
_args = docopt(__doc__, version="Lightning mesh proxy 0.2.1")
socket_address = f"/tmp/{_args['REMOTE_PUBKEY']}"
try:
trio.run(serve_unix_socket, socket_address)
except Exception:
logger.exception("Main thread stopped")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("Stopping Proxy")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment