Skip to content

Instantly share code, notes, and snippets.

@scturtle
Last active May 13, 2023 23:56
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 6 You must be signed in to fork a gist
  • Save scturtle/7967cb4e7c2bb0f91ca5 to your computer and use it in GitHub Desktop.
Save scturtle/7967cb4e7c2bb0f91ca5 to your computer and use it in GitHub Desktop.
python socks5 proxy server with asyncio (async/await)
#!/usr/bin/env python3.5
import socket
import asyncio
from struct import pack, unpack
class Client(asyncio.Protocol):
def connection_made(self, transport):
self.transport = transport
self.server_transport = None
def data_received(self, data):
# print('recv:', repr(data))
self.server_transport.write(data)
def connection_lost(self, *args):
self.server_transport.close()
class Server(asyncio.Protocol):
INIT, HOST, DATA = 0, 1, 2
def connection_made(self, transport):
print('from:', transport.get_extra_info('peername'))
self.transport = transport
self.state = self.INIT
def connection_lost(self, exc):
self.transport.close()
def data_received(self, data):
# print('send:', repr(data))
if self.state == self.INIT:
assert data[0] == 0x05
self.transport.write(pack('!BB', 0x05, 0x00)) # no auth
self.state = self.HOST
elif self.state == self.HOST:
ver, cmd, rsv, atype = data[:4]
assert ver == 0x05 and cmd == 0x01
if atype == 3: # domain
length = data[4]
hostname, nxt = data[5:5+length], 5+length
elif atype == 1: # ipv4
hostname, nxt = socket.inet_ntop(socket.AF_INET, data[4:8]), 8
elif atype == 4: # ipv6
hostname, nxt = socket.inet_ntop(socket.AF_INET6, data[4:20]), 20
port = unpack('!H', data[nxt:nxt+2])[0]
print('to:', hostname, port)
asyncio.ensure_future(self.connect(hostname, port))
self.state = self.DATA
elif self.state == self.DATA:
self.client_transport.write(data)
async def connect(self, hostname, port):
loop = asyncio.get_event_loop()
transport, client = \
await loop.create_connection(Client, hostname, port)
client.server_transport = self.transport
self.client_transport = transport
hostip, port = transport.get_extra_info('sockname')
host = unpack("!I", socket.inet_aton(hostip))[0]
self.transport.write(
pack('!BBBBIH', 0x05, 0x00, 0x00, 0x01, host, port))
if __name__ == '__main__':
loop = asyncio.get_event_loop()
srv = loop.create_server(Server, 'localhost', 8000)
loop.run_until_complete(srv)
loop.run_forever()
#!/usr/bin/env python3.5
import struct
import asyncio
import socket
from socket import gethostbyname, inet_aton, inet_ntoa
class Client(asyncio.Protocol):
def connection_made(self, transport):
self.transport = transport
def data_received(self, data):
# print('recv:', repr(data))
self.server.write(data)
def connection_lost(self, *args):
self.server.close()
async def handler(reader, writer):
async def read(fmt, size):
return struct.unpack(fmt, await reader.read(size))
def write(*argv):
writer.write(struct.pack(*argv))
loop = asyncio.get_event_loop()
async def call(func, *argv):
return await loop.run_in_executor(None, func, *argv)
ver, meth = await read('BB', 2)
assert ver == 0x05
await reader.read(meth)
write('!BB', 0x05, 0x00) # no auth
ver, cmd, rsv, atype = await read('BBBB', 4)
assert ver == 0x05 and cmd == 0x01
if atype == 0x03: # domain
length = (await read('B', 1))[0]
hostname = (await read("!{}s".format(length), length))[0]
hostip = socket.gethostbyname(hostname)
host = struct.unpack("!I", inet_aton(hostip))[0]
elif atype == 0x01: # ipv4
host = await reader.read(4)
hostname = isocket.inet_ntop(socket.AF_INET, host)
elif atype == 0x03: # ipv6
host = await reader.read(16)
hostname = isocket.inet_ntop(socket.AF_INET, host)
port = (await read('!H', 2))[0]
print('hostname:', hostname, 'port:', port)
transport, client = await loop.create_connection(Client, hostname, port)
client.server = writer
write('!BBBBIH', 0x05, 0x00, 0x00, 0x01, host, port)
data = await reader.read(8192)
while data:
client.transport.write(data)
if len(data) < 8192:
break
data = await reader.read(8192)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.start_server(handler, '0.0.0.0', 8000))
try:
loop.run_forever()
finally:
loop.close()
@Sherlock-Holo
Copy link

when I try to write a socks server with asyncio, I read data_received() will be called when receive some data, I don't know how to write my codes until read your server.py. It is amazing! Your code broaden my horizon! Thanks for your excellent codes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment