Skip to content

Instantly share code, notes, and snippets.

@honmaple
Created March 16, 2022 02:11
Show Gist options
  • Save honmaple/1b181456f58bed222c18047a725cac5f to your computer and use it in GitHub Desktop.
Save honmaple/1b181456f58bed222c18047a725cac5f to your computer and use it in GitHub Desktop.
transparent proxy with python
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import socket
import select
import sys
IP_TRANSPARENT = 19
HOST = "0.0.0.0"
PORT = 10
buffer_size = 4096
class TheServer:
input_list = []
channel = {}
def __init__(self, host, port):
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server.setsockopt(socket.IPPROTO_IP, IP_TRANSPARENT, 1)
self.server.bind((host, port))
self.server.listen(32)
def forward(self, c_ip, c_port, r_ip, r_port):
forward = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
forward.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
forward.setsockopt(socket.IPPROTO_IP, IP_TRANSPARENT, 1)
forward.bind((c_ip, c_port))
forward.settimeout(5)
try:
a = forward.connect((r_ip, r_port))
print(a)
return forward
except Exception as e:
print(e)
return False
def main_loop(self):
self.input_list.append(self.server)
while True:
ss = select.select
inputready, outputready, exceptready = ss(self.input_list, [], [])
for s in inputready:
if s == self.server:
self.on_accept()
break
self.data = s.recv(buffer_size)
if len(self.data) == 0:
self.on_close(s)
break
else:
self.on_recv(s)
def on_accept(self):
clientsock, (c_ip, c_port) = self.server.accept()
r_ip, r_port = clientsock.getsockname()
print("[ ] Connection from tcp://%s:%d to tcp://%s:%d" %
(c_ip, c_port, r_ip, r_port))
forward = self.forward(c_ip, c_port, r_ip, r_port)
if forward:
print("%s:%d has connected" % (c_ip, c_port))
self.input_list.append(clientsock)
self.input_list.append(forward)
self.channel[clientsock] = forward
self.channel[forward] = clientsock
else:
print("Can't establish connection with remote server.")
print("Closing connection with client side", c_ip, ":", c_port)
clientsock.close()
def on_close(self, s):
print(s.getpeername(), "has disconnected")
# remove objects from input_list
self.input_list.remove(s)
self.input_list.remove(self.channel[s])
out = self.channel[s]
# close the connection with client
self.channel[out].close() # equivalent to do self.s.close()
# close the connection with remote server
self.channel[s].close()
# delete both objects from channel dict
del self.channel[out]
del self.channel[s]
def on_recv(self, s):
data = self.data
# here we can parse and/or modify the data before send forward
print(data.decode("utf-8"))
self.channel[s].send(data)
if __name__ == '__main__':
server = TheServer(HOST, PORT)
try:
server.main_loop()
except KeyboardInterrupt:
print("Ctrl C - Stopping server")
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment