Skip to content

Instantly share code, notes, and snippets.

@jiva
Forked from nneonneo/socks-proxy-simple.py
Created July 17, 2019 21:48
Show Gist options
  • Save jiva/486b2a98a1c4801054d57d63043e3345 to your computer and use it in GitHub Desktop.
Save jiva/486b2a98a1c4801054d57d63043e3345 to your computer and use it in GitHub Desktop.
A simple socks server via python - updated for Pythonista (iOS)
#!python2
# -*- coding: utf-8 -*-
# 一个简单的 Socks5 代理服务器 , 只有 server 端 , 而且代码比较乱
# 不是很稳定 , 而且使用多线程并不是 select 模型
# Author : WangYihang <wangyihanger@gmail.com>
import socket
import threading
import sys
def handle(buffer):
return buffer
def transfer(src, dst):
src_name = src.getsockname()
src_address = src_name[0]
src_port = src_name[1]
dst_name = dst.getsockname()
dst_address = dst_name[0]
dst_port = dst_name[1]
print "[+] Starting transfer [%s:%d] => [%s:%d]" % (src_name, src_port, dst_name, dst_port)
while True:
buffer = src.recv(0x1000)
if not buffer:
print "[-] No data received! Breaking..."
break
# print "[+] %s:%d => %s:%d [%s]" % (src_address, src_port, dst_address, dst_port, repr(buffer))
# print "[+] %s:%d => %s:%d => Length : [%d]" % (src_address, src_port, dst_address, dst_port, len(buffer))
dst.send(handle(buffer))
print "[+] Closing connections! [%s:%d]" % (src_address, src_port)
src.close()
print "[+] Closing connections! [%s:%d]" % (dst_address, dst_port)
dst.close()
SOCKS_VERSION = 5
ERROR_VERSION = "[-] Client version error!"
ERROR_METHOD = "[-] Client method error!"
# ALLOWED_METHOD = [0, 2]
ALLOWED_METHOD = [0]
def socks_selection(sock):
client_version = ord(sock.recv(1))
print "[+] client version : %d" % (client_version)
if not client_version == SOCKS_VERSION:
sock.shutdown(socket.SHUT_RDWR)
sock.close()
return (False, ERROR_VERSION)
support_method_number = ord(sock.recv(1))
print "[+] Client Supported method number : %d" % (support_method_number)
support_methods = []
for i in range(support_method_number):
method = ord(sock.recv(1))
print "[+] Client Method : %d" % (method)
support_methods.append(method)
selected_method = None
for method in ALLOWED_METHOD:
if method in support_methods:
selected_method = 0
if selected_method == None:
sock.shutdown(socket.SHUT_RDWR)
sock.close()
return (False, ERROR_METHOD)
print "[+] Server select method : %d" % (selected_method)
response = chr(SOCKS_VERSION) + chr(selected_method)
sock.send(response)
return (True, sock)
CONNECT = 1
BIND = 2
UDP_ASSOCIATE = 3
IPV4 = 1
DOMAINNAME = 3
IPV6 = 4
CONNECT_SUCCESS = 0
ERROR_ATYPE = "[-] Client address error!"
RSV = 0
BNDADDR = "\x00" * 4
BNDPORT = "\x00" * 2
def socks_request(local_socket):
client_version = ord(local_socket.recv(1))
print "[+] client version : %d" % (client_version)
if not client_version == SOCKS_VERSION:
local_socket.shutdown(socket.SHUT_RDWR)
local_socket.close()
return (False, ERROR_VERSION)
cmd = ord(local_socket.recv(1))
if cmd == CONNECT:
print "[+] CONNECT request from client"
rsv = ord(local_socket.recv(1))
if rsv != 0:
local_socket.shutdown(socket.SHUT_RDWR)
local_socket.close()
return (False, ERROR_RSV)
atype = ord(local_socket.recv(1))
if atype == IPV4:
dst_address = ("".join(["%d." % (ord(i)) for i in local_socket.recv(4)]))[0:-1]
print "[+] IPv4 : %s" % (dst_address)
dst_port = ord(local_socket.recv(1)) * 0x100 + ord(local_socket.recv(1))
print "[+] Port : %s" % (dst_port)
remote_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
print "[+] Connecting : %s:%s" % (dst_address, dst_port)
remote_socket.connect((dst_address, dst_port))
response = ""
response += chr(SOCKS_VERSION)
response += chr(CONNECT_SUCCESS)
response += chr(RSV)
response += chr(IPV4)
response += BNDADDR
response += BNDPORT
local_socket.send(response)
print "[+] Tunnel connected! Tranfering data..."
r = threading.Thread(target=transfer, args=(
local_socket, remote_socket))
r.daemon = True
r.start()
s = threading.Thread(target=transfer, args=(
remote_socket, local_socket))
s.daemon = True
s.start()
return (True, (local_socket, remote_socket))
except socket.error as e:
print e
remote_socket.shutdown(socket.SHUT_RDWR)
remote_socket.close()
local_socket.shutdown(socket.SHUT_RDWR)
local_socket.close()
elif atype == DOMAINNAME:
domainname_length = ord(local_socket.recv(1))
domainname = ""
for i in range(domainname_length):
domainname += (local_socket.recv(1))
print "[+] Domain name : %s" % (domainname)
dst_port = ord(local_socket.recv(1)) * 0x100 + ord(local_socket.recv(1))
print "[+] Port : %s" % (dst_port)
remote_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
print "[+] Connecting : %s:%s" % (domainname, dst_port)
remote_socket.connect((domainname, dst_port))
response = ""
response += chr(SOCKS_VERSION)
response += chr(CONNECT_SUCCESS)
response += chr(RSV)
response += chr(IPV4)
response += BNDADDR
response += BNDPORT
local_socket.send(response)
print "[+] Tunnel connected! Tranfering data..."
r = threading.Thread(target=transfer, args=(
local_socket, remote_socket))
r.start()
s = threading.Thread(target=transfer, args=(
remote_socket, local_socket))
s.start()
return (True, (local_socket, remote_socket))
except socket.error as e:
print e
remote_socket.shutdown(socket.SHUT_RDWR)
remote_socket.close()
local_socket.shutdown(socket.SHUT_RDWR)
local_socket.close()
elif atype == IPV6:
dst_address = int(local_socket.recv(4).encode("hex"), 16)
print "[+] IPv6 : %x" % (dst_address)
dst_port = ord(local_socket.recv(1)) * 0x100 + ord(local_socket.recv(1))
print "[+] Port : %s" % (dst_port)
remote_socket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
remote_socket.connect((dst_address, dst_port))
local_socket.shutdown(socket.SHUT_RDWR)
local_socket.close()
return (False, ERROR_ATYPE)
else:
local_socket.shutdown(socket.SHUT_RDWR)
local_socket.close()
return (False, ERROR_ATYPE)
elif cmd == BIND:
# TODO
local_socket.shutdown(socket.SHUT_RDWR)
local_socket.close()
return (False, ERROR_CMD)
elif cmd == UDP_ASSOCIATE:
# TODO
local_socket.shutdown(socket.SHUT_RDWR)
local_socket.close()
return (False, ERROR_CMD)
else:
local_socket.shutdown(socket.SHUT_RDWR)
local_socket.close()
return (False, ERROR_CMD)
return (True, local_socket)
def server_loop(server_socket):
while True:
try:
local_socket, local_address = server_socket.accept()
except socket.timeout:
continue
local_socket.setblocking(True)
print '[+] Detect connection from [%s:%s]' % (local_address[0], local_address[1])
try:
result = socks_selection(local_socket)
if not result[0]:
print "[-] socks selection error!"
break
result = socks_request(result[1])
if not result[0]:
print "[-] socks request error!"
break
except Exception as e:
import traceback
traceback.print_exc()
print "[x] socks request exception:", e
def server(local_host, local_port, max_connection):
try:
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((local_host, local_port))
server_socket.listen(max_connection)
server_socket.settimeout(1)
print '[+] Server started [%s:%d]' % (local_host, local_port)
t = threading.Thread(target=server_loop, args=(server_socket,))
t.daemon = True
t.start()
while t.isAlive():
t.join(1)
print "[+] Releasing resources..."
local_socket.close()
print "[+] Closing server..."
server_socket.close()
print "[+] Server shuted down!"
finally:
print '[!] Shutting down'
try:
server_socket.close()
except Exception as e:
print '[!] failed to shut down server socket:', e
def start_wpad_server(hhost, hport, phost, pport):
import BaseHTTPServer
class HTTPHandler(BaseHTTPServer.BaseHTTPRequestHandler):
def do_HEAD(s):
s.send_response(200)
s.send_header("Content-type", "application/x-ns-proxy-autoconfig")
s.end_headers()
def do_GET(s):
s.send_response(200)
s.send_header("Content-type", "application/x-ns-proxy-autoconfig")
s.end_headers()
s.wfile.write(("""
function FindProxyForURL(url, host)
{
if (isInNet(host, "192.168.0.0", "255.255.0.0")) {
return "DIRECT";
} else if (isInNet(host, "172.16.0.0", "255.240.0.0")) {
return "DIRECT";
} else if (isInNet(host, "10.0.0.0", "255.0.0.0")) {
return "DIRECT";
} else {
return "SOCKS %s:%d";
}
}
""" % (phost, pport)).strip())
server = BaseHTTPServer.HTTPServer((hhost, hport), HTTPHandler)
threading.Thread(target=server.serve_forever).start()
def main(argv):
if len(argv) != 3:
print "Usage : "
print "\tpython %s [L_HOST] [L_PORT]" % (argv[0])
print "Example : "
print "\tpython %s 127.0.0.1 1080" % (argv[0])
print "Author : "
print "\tWangYihang <wangyihanger@gmail.com>"
exit(1)
LOCAL_HOST = argv[1]
LOCAL_PORT = int(argv[2])
# TODO real local wifi IP
# (this one usually works for hotspotting)
lhost = "172.20.10.1"
wpad_port = 80
start_wpad_server(LOCAL_HOST, wpad_port, lhost, LOCAL_PORT)
print("PAC URL: http://{}:{}/wpad.dat".format(lhost, wpad_port))
#REMOTE_HOST = sys.argv[3]
#REMOTE_PORT = int(sys.argv[4])
MAX_CONNECTION = 0x10
server(LOCAL_HOST, LOCAL_PORT, MAX_CONNECTION)
if __name__ == "__main__":
main(['socks.py', '0.0.0.0', '10800'])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment