Created
November 27, 2019 12:11
-
-
Save willcl-ark/730fbb96f25422c38729e6eee83e0a1e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""lightning mesh proxy. | |
Usage: | |
proxy.py MY_PUBKEY REMOTE_PUBKEY REMOTE_GID | |
Arguments: | |
MY_PUBKEY my node pubkey | |
REMOTE_PUBKEY the pubkey of the remote node | |
REMOTE_GID the GID of the remote node | |
""" | |
from docopt import docopt | |
import logging | |
import socket | |
import struct | |
import time | |
import trio | |
import lnproxy.ln_msg as ln_msg | |
logger = logging.getLogger(f"{'PROXY':<6s}") | |
MSG_LEN: int = 2 | |
MSG_LEN_MAC: int = 16 | |
MSG_HEADER: int = MSG_LEN + MSG_LEN_MAC | |
MSG_MAC: int = 16 | |
# set up memory channels between server and mesh connection | |
# shared between all socket connections | |
send_to_mesh, receive_from_server = trio.open_memory_channel(50) | |
send_to_server, receive_from_mesh = trio.open_memory_channel(50) | |
async def unlink_socket(sock): | |
"""Unlink a Unix Socket | |
""" | |
socket_path = trio.Path(sock) | |
try: | |
await socket_path.unlink() | |
except OSError: | |
if await socket_path.exists(): | |
raise | |
async def receive_exactly(stream, length, timeout=500): | |
res = b"" | |
end = time.time() + timeout | |
while len(res) < length and time.time() < end: | |
await stream.receive_some(length - len(res)) | |
if len(res) == length: | |
return res | |
else: | |
raise TimeoutError("Didn't receive enough bytes within the timeout, discarding") | |
async def proxy(read_stream, write_stream): | |
"""Proxy message traffic from one stream to another. | |
Handle and parse certain lightning messages. | |
""" | |
while True: | |
try: | |
# receive and unpack the header | |
header = await receive_exactly(read_stream, MSG_HEADER) | |
body_len = struct.unpack(">H", header[:MSG_LEN])[0] | |
body_len_mac = struct.unpack("16s", header[-16:])[0] | |
# receive the body | |
body = await receive_exactly(read_stream, body_len) | |
# parse the message within and modify header and body if necessary | |
header, body = ln_msg.parse(header, body) | |
# receive the body mac | |
body_mac = await receive_exactly(read_stream, MSG_MAC) | |
# send full message to remote | |
await write_stream.send_all(header + body + body_mac) | |
except TimeoutError: | |
break | |
except Exception: | |
logger.exception("Exception inside proxy.proxy") | |
break | |
async def socket_handler_local(stream): | |
"""Handles a listening socket. | |
Passes the stream to proxy() which will proxy it with a mesh memory_channel | |
:arg stream: a trio.SocketStream for the listening socket | |
""" | |
try: | |
async with trio.open_nursery() as nursery: | |
nursery.start_soon(proxy, stream, send_to_mesh) | |
nursery.start_soon(proxy, receive_from_mesh, stream) | |
except Exception: | |
logger.exception(f"Local socket_handler: crashed") | |
finally: | |
await stream.aclose() | |
async def serve_unix_socket(socket_address): | |
"""Serve a listening unix socket on 'socket_address' | |
Handles the socket using socket_handler_local | |
""" | |
await unlink_socket(socket_address) | |
listeners = [] | |
# Create the listening socket, bind to it and listen | |
sock = trio.socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | |
await sock.bind(socket_address) | |
sock.listen() | |
listeners.append(trio.SocketListener(sock)) | |
try: | |
# Manage the listening with the handler | |
await trio.serve_listeners(socket_handler_local, listeners) | |
except Exception: | |
logger.exception("serve_unix_socket: crashed") | |
finally: | |
# unlink the socket if it gets closed | |
await unlink_socket(socket_address) | |
def main(): | |
_args = docopt(__doc__, version="Lightning mesh proxy 0.2.1") | |
socket_address = f"/tmp/{_args['REMOTE_PUBKEY']}" | |
try: | |
trio.run(serve_unix_socket, socket_address) | |
except Exception: | |
logger.exception("Main thread stopped") | |
if __name__ == "__main__": | |
try: | |
main() | |
except KeyboardInterrupt: | |
print("Stopping Proxy") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment