Skip to content

Instantly share code, notes, and snippets.

@fduxiao
Forked from scturtle/server.py
Created February 17, 2018 16:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fduxiao/867e2723b4f76e52ab4f8824ddda4d90 to your computer and use it in GitHub Desktop.
Save fduxiao/867e2723b4f76e52ab4f8824ddda4d90 to your computer and use it in GitHub Desktop.
python socks5 proxy server with asyncio (async/await)
#!/usr/bin/env python3.5
import socket
import asyncio
from struct import pack, unpack
class Client(asyncio.Protocol):
def connection_made(self, transport):
self.transport = transport
self.server_transport = None
def data_received(self, data):
# print('recv:', repr(data))
self.server_transport.write(data)
def connection_lost(self, *args):
self.server_transport.close()
class Server(asyncio.Protocol):
INIT, HOST, DATA = 0, 1, 2
def connection_made(self, transport):
print('from:', transport.get_extra_info('peername'))
self.transport = transport
self.state = self.INIT
def connection_lost(self, exc):
self.transport.close()
def data_received(self, data):
# print('send:', repr(data))
if self.state == self.INIT:
assert data[0] == 0x05
self.transport.write(pack('!BB', 0x05, 0x00)) # no auth
self.state = self.HOST
elif self.state == self.HOST:
ver, cmd, rsv, atype = data[:4]
assert ver == 0x05 and cmd == 0x01
if atype == 3: # domain
length = data[4]
hostname, nxt = data[5:5+length], 5+length
elif atype == 1: # ipv4
hostname, nxt = socket.inet_ntop(socket.AF_INET, data[4:8]), 8
elif atype == 4: # ipv6
hostname, nxt = socket.inet_ntop(socket.AF_INET6, data[4:20]), 20
port = unpack('!H', data[nxt:nxt+2])[0]
print('to:', hostname, port)
asyncio.ensure_future(self.connect(hostname, port))
self.state = self.DATA
elif self.state == self.DATA:
self.client_transport.write(data)
async def connect(self, hostname, port):
loop = asyncio.get_event_loop()
transport, client = \
await loop.create_connection(Client, hostname, port)
client.server_transport = self.transport
self.client_transport = transport
hostip, port = transport.get_extra_info('sockname')
host = unpack("!I", socket.inet_aton(hostip))[0]
self.transport.write(
pack('!BBBBIH', 0x05, 0x00, 0x00, 0x01, host, port))
if __name__ == '__main__':
loop = asyncio.get_event_loop()
srv = loop.create_server(Server, 'localhost', 8000)
loop.run_until_complete(srv)
loop.run_forever()
#!/usr/bin/env python3.5
import struct
import asyncio
import socket
from socket import gethostbyname, inet_aton, inet_ntoa
class Client(asyncio.Protocol):
def connection_made(self, transport):
self.transport = transport
def data_received(self, data):
# print('recv:', repr(data))
self.server.write(data)
def connection_lost(self, *args):
self.server.close()
async def handler(reader, writer):
async def read(fmt, size):
return struct.unpack(fmt, await reader.read(size))
def write(*argv):
writer.write(struct.pack(*argv))
loop = asyncio.get_event_loop()
async def call(func, *argv):
return await loop.run_in_executor(None, func, *argv)
ver, meth = await read('BB', 2)
assert ver == 0x05
await reader.read(meth)
write('!BB', 0x05, 0x00) # no auth
ver, cmd, rsv, atype = await read('BBBB', 4)
assert ver == 0x05 and cmd == 0x01
if atype == 0x03: # domain
length = (await read('B', 1))[0]
hostname = (await read("!{}s".format(length), length))[0]
hostip = socket.gethostbyname(hostname)
host = struct.unpack("!I", inet_aton(hostip))[0]
elif atype == 0x01: # ipv4
host = await reader.read(4)
hostname = isocket.inet_ntop(socket.AF_INET, host)
elif atype == 0x03: # ipv6
host = await reader.read(16)
hostname = isocket.inet_ntop(socket.AF_INET, host)
port = (await read('!H', 2))[0]
print('hostname:', hostname, 'port:', port)
transport, client = await loop.create_connection(Client, hostname, port)
client.server = writer
write('!BBBBIH', 0x05, 0x00, 0x00, 0x01, host, port)
data = await reader.read(8192)
while data:
client.transport.write(data)
if len(data) < 8192:
break
data = await reader.read(8192)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.start_server(handler, '0.0.0.0', 8000))
try:
loop.run_forever()
finally:
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment