Skip to content

Instantly share code, notes, and snippets.

@HuangFJ
Created August 24, 2014 17:03
Show Gist options
  • Save HuangFJ/0b5e0dd716d4ce4971a1 to your computer and use it in GitHub Desktop.
Save HuangFJ/0b5e0dd716d4ce4971a1 to your computer and use it in GitHub Desktop.
NAT Traversal via UPnP Port Mapping
# NAT Traversal via UPnP Port Mapping
# Written by Nikos Fotoulis <nikofot at gmx.com>
# This code is public domain.
#
# Tested on Thomsom TG858v7 modem router.
# UPnP is hairy. May not work with other routers
# Feedback is welcome.
#
# How to add multicast address on Mac OS please refer to
# http://blogs.agilefaqs.com/2009/11/08/enabling-multicast-on-your-macos-unix/
import re, thread, socket, traceback as tb, random
from time import sleep
from urlparse import urlparse
from urllib import urlopen
import urllib2
VERBOSE = VVERBOSE = False
DEFAULT_ADDR = UPNPS = None
# regexes
rWANIP = re.compile(r"ST:[^\n]*(WAN(IP|PPP)Connection:\d+)", re.I).search
rLOCATION = re.compile(r"LoCaTiON:([^\n]+)", re.I).search
def rTAG(t):
return re.compile("<%s>(.+?)</%s>" % (t, t), re.I | re.DOTALL)
rSERVICE = rTAG("service").findall
for tag in ["controlURL", "URLBase", "NewExternalIPAddress", "NewLeaseDuration", "NewProtocol",
"NewInternalClient", "NewExternalPort", "NewInternalPort"]:
def f(txt, r=rTAG(tag).search):
x = r(txt)
if x:
return x.groups()[0].strip()
if tag.startswith("New"):
tag = tag[3:]
globals()["r" + tag.upper()] = f
# multicast and discover UPnP gateways
# Returns a dictionary where the keys are our "external IP" addresses
def DiscoverUPnP():
global UPNPS, DEFAULTGW, DEFAULTIFACE, DEFAULT_ADDR
S = {}
UPNPS = {}
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
# s.setsockopt (socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
R = "M-SEARCH * HTTP/1.1\r\nHOST: 239.255.255.250:1900\r\nMAN: ssdp:discover\r\nMX: 10\r\nST: ssdp:all\r\n\r\n"
try:
s.sendto(R, ("239.255.255.250", 1900))
except:
print "UPnP gateways unreachable"
return
timeout = 30
while 1:
s.settimeout(timeout)
try:
data, addr = s.recvfrom(4096)
except:
break
timeout = max(timeout * 0.5, 0.01)
r = rWANIP(data)
if r:
service = r.groups()[0]
r = rLOCATION(data)
if r:
location = r.groups()[0].strip()
if VERBOSE:
print "server:", addr, "supports", service, "at", location
S[addr] = service, location
if VVERBOSE: print "+"
for userver, (service, location) in S.items():
up = urlparse(location)
netloc = up.netloc
if ":" in netloc:
server, _, port = netloc.partition(":")
else:
server, port = netloc, "80"
data = urlopen(location).read()
URLBase = rURLBASE(data) or "http://%s:%s" % (server, port)
controlURL = None
for x in rSERVICE(data):
if service in x:
controlURL = rCONTROLURL(x)
break
if controlURL:
addr = GetExternalIP(service, URLBase + controlURL)
if addr:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((server, int(port)))
thishost = s.getsockname()[0]
s.close()
UPNPS[server] = addr, service, URLBase + controlURL, thishost
if VERBOSE:
print "for server:", server, "controlURL:", controlURL
else:
print "No controlURL found for server:", server
# set defaults
if len(UPNPS) == 1:
k = UPNPS.items()[0]
DEFAULT_ADDR, DEFAULTGW, DEFAULTIFACE = k[1][0], k[0], k[1][3]
else:
print "Multiple UPnP gateways!"
return UPNPS
# generic request POST data
def envelope(request, service, **kw):
return """<?xml version="1.0"?>
<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<s:Body>
<u:%s xmlns:u="urn:schemas-upnp-org:service:%s">
""" % (request, service) + "\n".join(["<%s>%s</%s>" % (k, v, k) for k, v in kw.items()]) + """ </u:%s>
</s:Body>
</s:Envelope>""" % request
def Request(service, URL, request, **kw):
req = urllib2.Request(URL)
req.add_header("content-type", 'text/xml; charset="utf-8"')
req.add_header("SOAPACTION", '"urn:schemas-upnp-org:service:%s#%s"' % (service, request))
req.add_data(envelope(request, service, **kw))
try:
return urllib2.build_opener().open(req).read()
except:
return
def GetExternalIP(service, URL):
answer = Request(service, URL, "GetExternalIPAddress")
addr = answer and rEXTERNALIPADDRESS(answer)
if not addr:
print "Couldn't get external IP address!"
return addr
# # The 3 basic actions of UPnP : list entries, add a mapping, delete a mapping
## Notes (tested on Thomson TG585v7):
## - Some times AddMapping returns a fail code (500) but the
## mapping *is* done and that can be seen by listing the entries (?!)
## So, the only way to be sure is to: list entries, add mapping, list entries
## and see the difference.
## - Returned LeaseDuration seems to be in deci-seconds
def getEntries(service, URL):
pmi = 0
while 1:
answer = Request(service, URL, "GetGenericPortMappingEntry", NewPortMappingIndex=pmi)
if not answer:
break
yield answer
pmi += 1
def listMappings(gw=None):
_, service, URL, iface = UPNPS[gw or DEFAULTGW]
L = []
for a in getEntries(service, URL):
if rPROTOCOL(a) == "TCP" and rINTERNALCLIENT(a) == iface:
L.append((int(rEXTERNALPORT(a)), int(rINTERNALPORT(a)),
int(rLEASEDURATION(a)) / 10.0))
else:
print "strange entry response!", a
return L
def addMapping(local_port, public_port, ttl, gw=None):
_, service, URL, iface = UPNPS[gw or DEFAULTGW]
# test if port already mapped. Result of AddMapping is unreliable
for eport, iport, _ in listMappings(gw):
if eport == public_port and iport != local_port:
return
answer = Request(service, URL, "AddPortMapping",
NewEnabled="1", NewRemoteHost="", NewLeaseDuration=ttl, NewInternalPort=local_port,
NewExternalPort=public_port, NewProtocol="TCP", NewInternalClient=iface,
NewPortMappingDescription="IndependNet")
if answer:
return True
# test if mapped. Result of AddMapping is unreliable
for eport, iport, _ in listMappings(gw):
if eport == public_port and iport == local_port:
return True
def delMapping(public_port, gw=None):
_, service, URL, _ = UPNPS[gw or DEFAULTGW]
if public_port != "all":
Request(service, URL, "DeletePortMapping",
NewRemoteHost="", NewExternalPort=public_port, NewProtocol="TCP")
else:
for public_port, _, _ in listMappings(gw):
Request(service, URL, "DeletePortMapping",
NewRemoteHost="", NewExternalPort=public_port, NewProtocol="TCP")
##
## Socket compatible interface for accepting connections on an external port.
## Does mapping keepalive every 60sec to make sure the mapping is not kept
## indefinately if our application crashes and didn't manage to remove it.
##
LEASE_DURATION = 60
def Accept(port):
if not port:
port = random.randint(2000, 60000)
if UPNPS is None:
DiscoverUPnP()
if not UPNPS:
raise Error("No UPnP gateway found. Can't listen ouside the modem")
s = socket.socket()
s.bind((DEFAULTIFACE, 0))
inport = s.getsockname()[1]
if not addMapping(inport, port, LEASE_DURATION):
raise Error("Port Mapping to external port %i Failed" % port)
s.listen(2)
return Acceptor(s, port, inport)
class UPnPError:
pass
class Acceptor:
def __init__(self, sock, eport, iport):
self.sock, self.eport, self.iport = sock, eport, iport
self.port = eport
self.active = True
thread.start_new_thread(self.keepalive, ())
def __iter__(self):
while self.active:
yield self.sock.accept()
def keepalive(self):
while 1:
ttl = None
for eport, iport, ttl in listMappings():
if eport == self.eport and iport == self.iport:
break
##print "Lease up for:", ttl
st = 0.1
if ttl is not None:
st = max(ttl - 0.1, 0.1)
sleep(st)
if not self.active: break
if not addMapping(self.iport, self.eport, LEASE_DURATION):
if ttl is None:
self.active = False
print "Failed to Keepalive the lease"
def __del__(self):
self.active = False
self.sock.close()
delMapping(self.eport)
## main. UPnP manager & testing
USAGE = """UPnP NAT Traversal (port mapping) test
Usage: python upnp.py [-gw gw] {list|bind|del} <arguments>
upnp list
list mappings
upnp bind internal-port external-port time-to-live
map public port to local port for some time
upnp del external-port|"all"
remove a port mapping
upnp
discover gateways and external IP addresses
Common options:
-gw : select UPnP gateway (if more than one -- NOT IMPLEMENTED)
"""
if __name__ == "__main__":
import sys
args = sys.argv[1:]
if "--help" in args:
print USAGE
exit()
VERBOSE = True
print "Discovering UPnP gateways..."
DiscoverUPnP()
for gw, v in UPNPS.items():
ip, service, URL, iface = v
print "External IP:", ip
print "\tgateway:", gw
print "\tservice:", service
print "\tcontrol URL:", URL
print "\tinterface:", iface
if not UPNPS:
exit("No UPnP gateway found")
if not args:
exit()
cmd = args.pop(0)
gw = None
if cmd == "list":
print "Port Mappings:"
for ep, ip, ttl in listMappings(gw):
print "\t%i <- %i (ttl=%i)" % (ip, ep, ttl)
elif cmd == "bind":
iport, eport, ttl = args
iport, eport, ttl = int(iport), int(eport), int(ttl)
if addMapping(iport, eport, ttl, gw):
print "OK"
else:
print "Failed. Port already used, or implementation error"
elif cmd == "del":
eport, = args
delMapping(eport, gw)
else:
print USAGE
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment