Basic relay proxy in Python
#!/usr/bin/env python3 | |
import threading | |
import socket | |
class Relay(threading.Thread): | |
def __init__(self, stop_listening, socket_a, socket_b, direction_indicator): | |
super().__init__() | |
self._stop_listening = stop_listening | |
self._socket_a = socket_a | |
self._socket_b = socket_b | |
self._direction_indicator = direction_indicator | |
def parse_chunk(self, chunk): | |
return chunk | |
def run(self): | |
while not self._stop_listening.is_set(): | |
try: | |
data = self._socket_a.recv(1024) | |
if not data: | |
break | |
print(self._direction_indicator, self.parse_chunk(data)) | |
self._socket_b.send(data) | |
except RuntimeError as error: | |
print(error) | |
break | |
class TcpProxy: | |
def __init__(self, port, destination, relay_class=Relay): | |
self._destination = destination | |
self._relay_class = relay_class | |
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
self._socket.bind(("0.0.0.0", port)) | |
def listen(self, backlog=100): | |
self._socket.listen(backlog) | |
stop_event = threading.Event() | |
while not stop_event.is_set(): | |
try: | |
client, _ = self._socket.accept() | |
print("client connected") | |
destination_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
destination_socket.connect(self._destination) | |
relay_a = self._relay_class(stop_event, client, destination_socket, "in >>\t") | |
relay_a.start() | |
relay_b = self._relay_class(stop_event, destination_socket, client, "<< out\t") | |
relay_b.start() | |
except KeyboardInterrupt: | |
stop_event.set() | |
if __name__ == "__main__": | |
proxy = TcpProxy(9000, ("localhost", 9001)) | |
proxy.listen() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment