Skip to content

Instantly share code, notes, and snippets.

@nom3ad
Created November 1, 2021 12:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nom3ad/17b38f0c2d3cebc9a2d99b2b0c14f995 to your computer and use it in GitHub Desktop.
Save nom3ad/17b38f0c2d3cebc9a2d99b2b0c14f995 to your computer and use it in GitHub Desktop.
python3 support for https://github.com/jtriley/pystun
#!/usr/bin/env python
# https://github.com/jfsanchez91/pystun3/blob/py3-support/stun/__init__.py
import binascii
import logging
import random
import socket
import sys
__version__ = '0.1.0'
log = logging.getLogger("pystun")
STUN_SERVERS = (
'stun.ekiga.net',
'stun.ideasip.com',
# 'stun.voiparound.com',
'stun.voipbuster.com',
'stun.voipstunt.com',
# 'stun.voxgratia.org'
)
stun_servers_list = STUN_SERVERS
DEFAULTS = {
'stun_port': 3478,
'source_ip': '0.0.0.0',
'source_port': 54320
}
# stun attributes
MappedAddress = '0001'
ResponseAddress = '0002'
ChangeRequest = '0003'
SourceAddress = '0004'
ChangedAddress = '0005'
Username = '0006'
Password = '0007'
MessageIntegrity = '0008'
ErrorCode = '0009'
UnknownAttribute = '000A'
ReflectedFrom = '000B'
XorOnly = '0021'
XorMappedAddress = '8020'
ServerName = '8022'
SecondaryAddress = '8050' # Non standard extension
# types for a stun message
BindRequestMsg = '0001'
BindResponseMsg = '0101'
BindErrorResponseMsg = '0111'
SharedSecretRequestMsg = '0002'
SharedSecretResponseMsg = '0102'
SharedSecretErrorResponseMsg = '0112'
dictAttrToVal = {'MappedAddress': MappedAddress,
'ResponseAddress': ResponseAddress,
'ChangeRequest': ChangeRequest,
'SourceAddress': SourceAddress,
'ChangedAddress': ChangedAddress,
'Username': Username,
'Password': Password,
'MessageIntegrity': MessageIntegrity,
'ErrorCode': ErrorCode,
'UnknownAttribute': UnknownAttribute,
'ReflectedFrom': ReflectedFrom,
'XorOnly': XorOnly,
'XorMappedAddress': XorMappedAddress,
'ServerName': ServerName,
'SecondaryAddress': SecondaryAddress}
dictMsgTypeToVal = {
'BindRequestMsg': BindRequestMsg,
'BindResponseMsg': BindResponseMsg,
'BindErrorResponseMsg': BindErrorResponseMsg,
'SharedSecretRequestMsg': SharedSecretRequestMsg,
'SharedSecretResponseMsg': SharedSecretResponseMsg,
'SharedSecretErrorResponseMsg': SharedSecretErrorResponseMsg}
dictValToMsgType = {}
dictValToAttr = {}
Blocked = "Blocked"
OpenInternet = "Open Internet"
FullCone = "Full Cone"
SymmetricUDPFirewall = "Symmetric UDP Firewall"
RestricNAT = "Restric NAT"
RestricPortNAT = "Restric Port NAT"
SymmetricNAT = "Symmetric NAT"
ChangedAddressError = "Meet an error, when do Test1 on Changed IP and Port"
def _dict_items(_dict):
if sys.version_info >= (3,):
return list(_dict.items())
return _dict.items()
def _b2a_hex(data):
if sys.version_info >= (3,):
return binascii.b2a_hex(data).decode()
return binascii.b2a_hex(data)
def _initialize():
for item in _dict_items(dictAttrToVal):
dictValToAttr.update({item[1]: item[0]})
for item in _dict_items(dictMsgTypeToVal):
dictValToMsgType.update({item[1]: item[0]})
def gen_tran_id():
a = ''.join(random.choice('0123456789ABCDEF') for i in range(32))
# return binascii.a2b_hex(a)
return a
def stun_test(sock, host, port, source_ip, source_port, send_data=""):
retVal = {'Resp': False, 'ExternalIP': None, 'ExternalPort': None,
'SourceIP': None, 'SourcePort': None, 'ChangedIP': None,
'ChangedPort': None}
str_len = "%#04d" % (len(send_data) / 2)
tranid = gen_tran_id()
str_data = ''.join([BindRequestMsg, str_len, tranid, send_data])
data = binascii.a2b_hex(str_data)
recvCorr = False
while not recvCorr:
recieved = False
count = 3
while not recieved:
log.debug("sendto: %s -> %s", ( source_ip, source_port), (host, port))
try:
sock.sendto(data, (host, port))
except socket.gaierror:
retVal['Resp'] = False
return retVal
try:
buf, addr = sock.recvfrom(2048)
log.debug("recvfrom: %s", addr)
recieved = True
except Exception:
recieved = False
if count > 0:
count -= 1
else:
retVal['Resp'] = False
return retVal
msgtype = _b2a_hex(buf[0:2])
bind_resp_msg = dictValToMsgType[msgtype] == "BindResponseMsg"
tranid_match = tranid.upper() == _b2a_hex(buf[4:20]).upper()
if bind_resp_msg and tranid_match:
recvCorr = True
retVal['Resp'] = True
len_message = int(_b2a_hex(buf[2:4]), 16)
len_remain = len_message
base = 20
while len_remain:
attr_type = _b2a_hex(buf[base:(base + 2)])
attr_len = int(_b2a_hex(buf[(base + 2):(base + 4)]), 16)
if attr_type == MappedAddress:
port = int(_b2a_hex(buf[base + 6:base + 8]), 16)
ip = ".".join([
str(int(_b2a_hex(buf[base + 8:base + 9]), 16)),
str(int(_b2a_hex(buf[base + 9:base + 10]), 16)),
str(int(_b2a_hex(buf[base + 10:base + 11]), 16)),
str(int(_b2a_hex(buf[base + 11:base + 12]), 16))
])
retVal['ExternalIP'] = ip
retVal['ExternalPort'] = port
if attr_type == SourceAddress:
port = int(_b2a_hex(buf[base + 6:base + 8]), 16)
ip = ".".join([
str(int(_b2a_hex(buf[base + 8:base + 9]), 16)),
str(int(_b2a_hex(buf[base + 9:base + 10]), 16)),
str(int(_b2a_hex(buf[base + 10:base + 11]), 16)),
str(int(_b2a_hex(buf[base + 11:base + 12]), 16))
])
retVal['SourceIP'] = ip
retVal['SourcePort'] = port
if attr_type == ChangedAddress:
port = int(_b2a_hex(buf[base + 6:base + 8]), 16)
ip = ".".join([
str(int(_b2a_hex(buf[base + 8:base + 9]), 16)),
str(int(_b2a_hex(buf[base + 9:base + 10]), 16)),
str(int(_b2a_hex(buf[base + 10:base + 11]), 16)),
str(int(_b2a_hex(buf[base + 11:base + 12]), 16))
])
retVal['ChangedIP'] = ip
retVal['ChangedPort'] = port
# if attr_type == ServerName:
# serverName = buf[(base+4):(base+4+attr_len)]
base = base + 4 + attr_len
len_remain = len_remain - (4 + attr_len)
# s.close()
return retVal
def get_nat_type(s, source_ip, source_port, stun_host=None, stun_port=3478):
_initialize()
port = stun_port
log.debug("Do Test1")
resp = False
if stun_host:
ret = stun_test(s, stun_host, port, source_ip, source_port)
resp = ret['Resp']
else:
stun_candidates = list(stun_servers_list)
random.shuffle(stun_candidates)
for host in stun_candidates:
log.debug('Trying STUN host: %s', host)
ret = stun_test(s, host, port, source_ip, source_port)
resp = ret['Resp']
if resp:
stun_host = host
break
else:
raise Exception("unreachable stun servers")
if not resp:
return Blocked, ret
log.debug("Result: %s", ret)
exIP = ret['ExternalIP']
exPort = ret['ExternalPort']
changedIP = ret['ChangedIP']
changedPort = ret['ChangedPort']
if ret['ExternalIP'] == source_ip:
changeRequest = ''.join([ChangeRequest, '0004', "00000006"])
ret = stun_test(s, stun_host, port, source_ip, source_port,
changeRequest)
if ret['Resp']:
typ = OpenInternet
else:
typ = SymmetricUDPFirewall
else:
changeRequest = ''.join([ChangeRequest, '0004', "00000006"])
log.debug("Do Test2")
ret = stun_test(s, stun_host, port, source_ip, source_port,
changeRequest)
log.debug("Result: %s", ret)
if ret['Resp']:
typ = FullCone
else:
log.debug("Do Test1")
ret = stun_test(s, changedIP, changedPort, source_ip, source_port)
log.debug("Result: %s", ret)
if not ret['Resp']:
typ = ChangedAddressError
else:
if exIP == ret['ExternalIP'] and exPort == ret['ExternalPort']:
changePortRequest = ''.join([ChangeRequest, '0004',
"00000002"])
log.debug("Do Test3")
ret = stun_test(s, changedIP, port, source_ip, source_port,
changePortRequest)
log.debug("Result: %s", ret)
if ret['Resp']:
typ = RestricNAT
else:
typ = RestricPortNAT
else:
typ = SymmetricNAT
return typ, ret
def get_ip_info(source_ip="0.0.0.0", source_port=54320, stun_host=None,
stun_port=3478):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(2)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((source_ip, source_port))
nat_type, nat = get_nat_type(s, source_ip, source_port,
stun_host=stun_host, stun_port=stun_port)
external_ip = nat['ExternalIP']
external_port = nat['ExternalPort']
s.close()
return (nat_type, external_ip, external_port)
# https://github.com/jfsanchez91/pystun3/blob/py3-support/stun/cli.py
# from __future__ import print_function
import argparse
import logging
import sys
# import stun
stun = sys.modules[__name__]
def make_argument_parser():
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument(
'-d', '--debug', action='store_true',
help='Enable debug logging'
)
parser.add_argument(
'-H', '--stun-host',
help='STUN host to use'
)
parser.add_argument(
'-P', '--stun-port', type=int,
default=stun.DEFAULTS['stun_port'],
help='STUN host port to use'
)
parser.add_argument(
'-i', '--source-ip',
default=stun.DEFAULTS['source_ip'],
help='network interface for client'
)
parser.add_argument(
'-p', '--source-port', type=int,
default=stun.DEFAULTS['source_port'],
help='port to listen on for client'
)
parser.add_argument('--version', action='version', version=stun.__version__)
return parser
def main():
try:
options = make_argument_parser().parse_args()
if options.debug:
logging.basicConfig()
stun.log.setLevel(logging.DEBUG)
nat_type, external_ip, external_port = stun.get_ip_info(
source_ip=options.source_ip,
source_port=options.source_port,
stun_host=options.stun_host,
stun_port=options.stun_port
)
print('NAT Type:', nat_type)
print('External IP:', external_ip)
print('External Port:', external_port)
except KeyboardInterrupt:
sys.exit()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment