Skip to content

Instantly share code, notes, and snippets.

@whoisjeeva
Created January 15, 2021 11:47
Show Gist options
  • Save whoisjeeva/b685ee4df9fb78832a8b4eda59fc7b64 to your computer and use it in GitHub Desktop.
Save whoisjeeva/b685ee4df9fb78832a8b4eda59fc7b64 to your computer and use it in GitHub Desktop.
Socks5 proxy server in Python
import socket
import threading
import select
SOCKS_VERSION = 5
class Proxy:
def __init__(self):
self.username = "username"
self.password = "password"
def handle_client(self, connection):
# greeting header
# read and unpack 2 bytes from a client
version, nmethods = connection.recv(2)
# get available methods [0, 1, 2]
methods = self.get_available_methods(nmethods, connection)
# accept only USERNAME/PASSWORD auth
if 2 not in set(methods):
# close connection
connection.close()
return
# send welcome message
connection.sendall(bytes([SOCKS_VERSION, 2]))
if not self.verify_credentials(connection):
return
# request (version=5)
version, cmd, _, address_type = connection.recv(4)
if address_type == 1: # IPv4
address = socket.inet_ntoa(connection.recv(4))
elif address_type == 3: # Domain name
domain_length = connection.recv(1)[0]
address = connection.recv(domain_length)
address = socket.gethostbyname(address)
# convert bytes to unsigned short array
port = int.from_bytes(connection.recv(2), 'big', signed=False)
try:
if cmd == 1: # CONNECT
remote = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
remote.connect((address, port))
bind_address = remote.getsockname()
print("* Connected to {} {}".format(address, port))
else:
connection.close()
addr = int.from_bytes(socket.inet_aton(bind_address[0]), 'big', signed=False)
port = bind_address[1]
reply = b''.join([
SOCKS_VERSION.to_bytes(1, 'big'),
int(0).to_bytes(1, 'big'),
int(0).to_bytes(1, 'big'),
int(1).to_bytes(1, 'big'),
addr.to_bytes(4, 'big'),
port.to_bytes(2, 'big')
])
except Exception as e:
# return connection refused error
reply = self.generate_failed_reply(address_type, 5)
connection.sendall(reply)
# establish data exchange
if reply[1] == 0 and cmd == 1:
self.exchange_loop(connection, remote)
connection.close()
def exchange_loop(self, client, remote):
while True:
# wait until client or remote is available for read
r, w, e = select.select([client, remote], [], [])
if client in r:
data = client.recv(4096)
if remote.send(data) <= 0:
break
if remote in r:
data = remote.recv(4096)
if client.send(data) <= 0:
break
def generate_failed_reply(self, address_type, error_number):
return b''.join([
SOCKS_VERSION.to_bytes(1, 'big'),
error_number.to_bytes(1, 'big'),
int(0).to_bytes(1, 'big'),
address_type.to_bytes(1, 'big'),
int(0).to_bytes(4, 'big'),
int(0).to_bytes(4, 'big')
])
def verify_credentials(self, connection):
version = ord(connection.recv(1)) # should be 1
username_len = ord(connection.recv(1))
username = connection.recv(username_len).decode('utf-8')
password_len = ord(connection.recv(1))
password = connection.recv(password_len).decode('utf-8')
if username == self.username and password == self.password:
# success, status = 0
response = bytes([version, 0])
connection.sendall(response)
return True
# failure, status != 0
response = bytes([version, 0xFF])
connection.sendall(response)
connection.close()
return False
def get_available_methods(self, nmethods, connection):
methods = []
for i in range(nmethods):
methods.append(ord(connection.recv(1)))
return methods
def run(self, host, port):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((host, port))
s.listen()
print("* Socks5 proxy server is running on {}:{}".format(host, port))
while True:
conn, addr = s.accept()
print("* new connection from {}".format(addr))
t = threading.Thread(target=self.handle_client, args=(conn,))
t.start()
if __name__ == "__main__":
proxy = Proxy()
proxy.run("127.0.0.1", 3000)
@ankit-droidshift
Copy link

hello sir can you edit it to support udp please

@steelliberty
Copy link

Here is an untested version of the original with a few add-ons and UDP -- thank to UnKnownCoder:

import socket
import threading
import select

SOCKS_VERSION = 5

class Proxy:
def init(self):
self.username = "username"
self.password = "password"

def handle_client(self, connection):
    try:
        # Greeting header
        version, nmethods = connection.recv(2)
        methods = self.get_available_methods(nmethods, connection)
        if 2 not in set(methods):
            connection.close()
            return

        connection.sendall(bytes([SOCKS_VERSION, 2]))

        if not self.verify_credentials(connection):
            return

        version, cmd, _, address_type = connection.recv(4)
        if address_type == 1:  # IPv4
            address = socket.inet_ntoa(connection.recv(4))
        elif address_type == 3:  # Domain name
            domain_length = connection.recv(1)[0]
            address = connection.recv(domain_length)
            address = socket.gethostbyname(address)

        port = int.from_bytes(connection.recv(2), 'big', signed=False)

        if cmd == 1:  # CONNECT
            self.handle_tcp(connection, address, port)
        elif cmd == 3:  # UDP ASSOCIATE
            self.handle_udp(connection, address, port)
        else:
            connection.close()
    except Exception as e:
        print(f"Error handling client: {e}")

def handle_tcp(self, connection, address, port):
    try:
        remote = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        remote.connect((address, port))
        bind_address = remote.getsockname()
        print(f"* Connected to {address} {port}")

        addr = int.from_bytes(socket.inet_aton(bind_address[0]), 'big', signed=False)
        port = bind_address[1]

        reply = b''.join([
            SOCKS_VERSION.to_bytes(1, 'big'),
            int(0).to_bytes(1, 'big'),
            int(0).to_bytes(1, 'big'),
            int(1).to_bytes(1, 'big'),
            addr.to_bytes(4, 'big'),
            port.to_bytes(2, 'big')
        ])
    except Exception as e:
        reply = self.generate_failed_reply(1, 5)
        print(f"TCP connection error: {e}")

    connection.sendall(reply)
    if reply[1] == 0:
        self.exchange_loop(connection, remote)

    connection.close()

def handle_udp(self, connection, address, port):
    try:
        udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        bind_address = udp_socket.getsockname()
        print(f"* UDP Associate with {address} {port}")

        addr = int.from_bytes(socket.inet_aton(bind_address[0]), 'big', signed=False)
        port = bind_address[1]

        reply = b''.join([
            SOCKS_VERSION.to_bytes(1, 'big'),
            int(0).to_bytes(1, 'big'),
            int(0).to_bytes(1, 'big'),
            int(1).to_bytes(1, 'big'),
            addr.to_bytes(4, 'big'),
            port.to_bytes(2, 'big')
        ])
        connection.sendall(reply)

        while True:
            data, client_address = udp_socket.recvfrom(4096)
            udp_socket.sendto(data, (address, port))
    except Exception as e:
        print(f"UDP Associate error: {e}")

    connection.close()

def exchange_loop(self, client, remote):
    while True:
        r, w, e = select.select([client, remote], [], [])
        if client in r:
            data = client.recv(4096)
            if remote.send(data) <= 0:
                break
        if remote in r:
            data = remote.recv(4096)
            if client.send(data) <= 0:
                break

def generate_failed_reply(self, address_type, error_number):
    return b''.join([
        SOCKS_VERSION.to_bytes(1, 'big'),
        error_number.to_bytes(1, 'big'),
        int(0).to_bytes(1, 'big'),
        address_type.to_bytes(1, 'big'),
        int(0).to_bytes(4, 'big'),
        int(0).to_bytes(4, 'big')
    ])

def verify_credentials(self, connection):
    version = connection.recv(1)[0]  # should be 1
    username_len = connection.recv(1)[0]
    username = connection.recv(username_len).decode('utf-8')
    password_len = connection.recv(1)[0]
    password = connection.recv(password_len).decode('utf-8')

    if username == self.username and password == self.password:
        response = bytes([version, 0])
        connection.sendall(response)
        return True

    response = bytes([version, 0xFF])
    connection.sendall(response)
    connection.close()
    return False

def get_available_methods(self, nmethods, connection):
    return [connection.recv(1)[0] for _ in range(nmethods)]

def run(self, host, port):
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.bind((host, port))
        s.listen()
        print(f"* SOCKS5 proxy server is running on {host}:{port}")
        while True:
            conn, addr = s.accept()
            print(f"* New connection from {addr}")
            threading.Thread(target=self.handle_client, args=(conn,)).start()

if name == "main":
proxy = Proxy()
proxy.run("127.0.0.1", 3000)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment