Skip to content

Instantly share code, notes, and snippets.

@tcalmant
Last active January 17, 2021 21:00
Show Gist options
  • Save tcalmant/770511b420c1139fe1cc88c2685f2903 to your computer and use it in GitHub Desktop.
Save tcalmant/770511b420c1139fe1cc88c2685f2903 to your computer and use it in GitHub Desktop.
A working SOCKS5 server based on asyncio (TCP Connect only)
#!/usr/bin/env python3
"""
Asyncio-based SOCKS5 Proxy
:author: Thomas Calmant
:copyright: Copyright 2017, Thomas Calmant
:license: Apache License 2.0
:version: 0.0.1
"""
import argparse
import asyncio
import enum
import logging
import socket
import struct
import sys
# ------------------------------------------------------------------------------
# Module version
__version_info__ = (0, 0, 1)
__version__ = ".".join(str(x) for x in __version_info__)
# Documentation strings format
__docformat__ = "restructuredtext en"
NO_ADDRESS = socket.inet_aton("0.0.0.0")
# ------------------------------------------------------------------------------
class Command(enum.IntEnum):
"""
SOCKS5 request command type
"""
CONNECT = 0x01
BIND = 0x02
UDP_ASSOCIATE = 0x03
class AddressType(enum.IntEnum):
"""
SOCKS5 address type
"""
IPV4 = 0x01
DOMAINE_NAME = 0x03
IPV6 = 0x04
class ReplyCode(enum.IntEnum):
"""
SOCKS5 reply code
"""
SUCCESS = 0x00
SERVER_FAILURE = 0x01
NOT_ALLOWED = 0x02
NETWORK_UNREACHABLE = 0x03
HOST_UNREACHABLE = 0x04
CONNECTION_REFUSED = 0x05
TTL_EXPIRED = 0x06
COMMAND_NOT_SUPPORTED = 0x07
ADDRESS_NOT_SUPPORTED = 0x08
# ------------------------------------------------------------------------------
class ClientProtocol(asyncio.Protocol):
"""
SOCKS5 Client protocol definition
"""
def __init__(self, writer):
"""
Sets up members
:param writer: Data output stream
"""
super().__init__()
self._transport = None
self._writer = writer
def connection_made(self, transport):
"""
A connection has been made
:param transport: The Transport representing the connection
"""
self._transport = transport
def data_received(self, data):
"""
Data has been received from the client
:param data: Received data
"""
self._writer.write(data)
def connection_lost(self, exc):
"""
Connection to server lost
:param exc: An exception object or None
"""
self._writer.close()
def write(self, data):
"""
Write data back
:param data: Data to write
"""
self._transport.write(data)
# ------------------------------------------------------------------------------
async def io_handler(reader, writer):
"""
The asyncio connection handler
:param reader: Input stream
:param writer: Output stream
"""
async def read(fmt):
"""
Unpacks data from the input stream
:param fmt: Data format
:return: The unpacked data
"""
return struct.unpack(fmt, await reader.read(struct.calcsize(fmt)))
def write(fmt, *argv):
"""
Packs data to the writer
:param fmt: Pack format
:param argv: Pack content
"""
writer.write(struct.pack(fmt, *argv))
def write_reply(code, address_type=AddressType.IPV4,
raw_address=NO_ADDRESS, port=0):
"""
Writes a reply
:param code: Reply code
:param address_type: Kind of address
:param raw_address: Raw target IP address
:param port: Target port
"""
# Version, Response Code, Reserved, Address Type,
# Target Raw Address, Target Port
write('!BBBB', 0x05, code, 0x00, address_type.value)
writer.write(raw_address)
write('!H', port)
# The client connects to the server, and sends a version
# identifier/method selection message
try:
version, nb_methods = await read('BB')
except struct.error:
write_reply(ReplyCode.SERVER_FAILURE)
writer.close()
return
if version != 0x05:
write_reply(ReplyCode.SERVER_FAILURE)
writer.close()
return
# Ignore the methods
await reader.read(nb_methods)
# Sends the server "selected" method: no authentication
write('BB', version, 0x00)
# Read the header of the request
version, cmd, _, address_type = await read('BBBB')
if cmd != Command.CONNECT:
write_reply(ReplyCode.COMMAND_NOT_SUPPORTED)
writer.close()
return
if address_type == AddressType.IPV4:
# IPv4 connection
raw_address = await reader.read(4)
address = socket.inet_ntop(socket.AF_INET, raw_address)
elif address_type == AddressType.IPV6:
# IPv6 connection
raw_address = await reader.read(16)
address = socket.inet_ntop(socket.AF_INET6, raw_address)
elif address_type == AddressType.DOMAINE_NAME:
# DNS resolution
length = (await read('B'))[0]
hostname = (await read("!{}s".format(length)))[0]
address = socket.gethostbyname(hostname)
else:
write_reply(ReplyCode.ADDRESS_NOT_SUPPORTED)
writer.close()
raise IOError("Unhandled address type: {:x}".format(address_type))
port = (await read('!H'))[0]
# Get the client address (for logs)
clt_address = writer.get_extra_info("socket").getsockname()[0]
# Prepare the client handler
loop = asyncio.get_event_loop()
try:
transport, client = await loop.create_connection(
lambda: ClientProtocol(writer), address, port)
except IOError as ex:
logging.info("%s => Error accessing %s %s: %s",
clt_address, address, port, ex)
write_reply(ReplyCode.HOST_UNREACHABLE)
writer.close()
return
else:
logging.info("%s => %s %d", clt_address, address, port)
# Get our own details
server_socket = transport.get_extra_info("socket")
bind_address, bind_port = server_socket.getsockname()[:2]
bind_address = socket.inet_pton(server_socket.family, bind_address)
if server_socket.family == socket.AF_INET:
address_type = AddressType.IPV4
elif server_socket.family == socket.AF_INET6:
address_type = AddressType.IPV6
# Write a reply
write_reply(ReplyCode.SUCCESS, address_type, bind_address, bind_port)
data = await reader.read(8192)
while data:
client.write(data)
data = await reader.read(8192)
# ------------------------------------------------------------------------------
def main(args=None):
"""
Script entry point
:param args: Program arguments
:return: An exit code
"""
# Parse arguments
parser = argparse.ArgumentParser(description="SOCKS5 Proxy Server")
parser.add_argument("-l", "--listen", default="0.0.0.0",
metavar="ADDRESS", help="Bind address to listen")
parser.add_argument("-p", "--port", type=int, default=1080,
help="Server port")
parser.add_argument("-d", "--debug", action="store_true",
help="Log debug messages")
opts = parser.parse_args(args)
if opts.port < 1:
print("A valid port must be given (random port not supported)")
return 1
if opts.debug:
logging.getLogger().setLevel(logging.DEBUG)
# Start the server
loop = asyncio.get_event_loop()
server = loop.run_until_complete(asyncio.start_server(
io_handler, opts.listen, opts.port))
# Print access information
print("SOCKS5 Server listening on:")
for srv_socket in server.sockets:
address, port = srv_socket.getsockname()[:2]
print("-", address, port)
logging.info("Proxy bound to %s %d", address, port)
# Loop forever
try:
loop.run_forever()
except KeyboardInterrupt:
logging.warning("Proxy interrupted.")
print("Got Ctrl+C interrupt")
return 128
except Exception as ex:
logging.exception("Error running proxy: %s", ex)
raise
finally:
logging.warning("Proxy stopped.")
loop.close()
if __name__ == '__main__':
logging.basicConfig(level=logging.WARNING)
sys.exit(main() or 0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment