Skip to content

Instantly share code, notes, and snippets.

@michaelneu
Created March 19, 2019 01:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save michaelneu/5bf8b99a904cf8c1fa67aadfaeb741d0 to your computer and use it in GitHub Desktop.
Save michaelneu/5bf8b99a904cf8c1fa67aadfaeb741d0 to your computer and use it in GitHub Desktop.
Basic relay proxy in Python
#!/usr/bin/env python3
import threading
import socket
class Relay(threading.Thread):
def __init__(self, stop_listening, socket_a, socket_b, direction_indicator):
super().__init__()
self._stop_listening = stop_listening
self._socket_a = socket_a
self._socket_b = socket_b
self._direction_indicator = direction_indicator
def parse_chunk(self, chunk):
return chunk
def run(self):
while not self._stop_listening.is_set():
try:
data = self._socket_a.recv(1024)
if not data:
break
print(self._direction_indicator, self.parse_chunk(data))
self._socket_b.send(data)
except RuntimeError as error:
print(error)
break
class TcpProxy:
def __init__(self, port, destination, relay_class=Relay):
self._destination = destination
self._relay_class = relay_class
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.bind(("0.0.0.0", port))
def listen(self, backlog=100):
self._socket.listen(backlog)
stop_event = threading.Event()
while not stop_event.is_set():
try:
client, _ = self._socket.accept()
print("client connected")
destination_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
destination_socket.connect(self._destination)
relay_a = self._relay_class(stop_event, client, destination_socket, "in >>\t")
relay_a.start()
relay_b = self._relay_class(stop_event, destination_socket, client, "<< out\t")
relay_b.start()
except KeyboardInterrupt:
stop_event.set()
if __name__ == "__main__":
proxy = TcpProxy(9000, ("localhost", 9001))
proxy.listen()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment