Last active
December 16, 2015 13:29
-
-
Save cmol/5442464 to your computer and use it in GitHub Desktop.
Script for requesting a port redirection via natPMP. Uses python3
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Script for requesting a port redirection via natPMP. | |
Follows the standart of: | |
http://tools.ietf.org/html/draft-cheshire-nat-pmp-07#section-3.3 | |
with the exception of hold times. We are not going to wait a little | |
above 3 minutes to test for natPMP. | |
""" | |
import socket | |
import random | |
import errno | |
import os | |
from subprocess import check_output | |
from struct import pack, unpack | |
def create_payload(local_port, external_port, lifetime): | |
"""Create the natPMP payload for opening 'external_port' | |
(0 means that the GW will choose one randomly) and | |
redirecting the traffic to client machine. | |
int local_port | |
int external_port (optional) | |
return bytecode payload | |
""" | |
return pack('>2B3HI', 0, 1, 0, local_port, external_port, lifetime) | |
def send_payload(s, payload, gateway): | |
"""Encode and send the payload to the gateway of the network | |
socket s | |
int payload | |
string gateway | |
return bool success | |
""" | |
try: | |
s.sendto(payload, (gateway, 5351)) | |
success = True | |
except socket.error as err: | |
if err.errno != errno.ECONNREFUSED: | |
raise err | |
success = False | |
return success | |
def parse_respons(payload): | |
"""Parse the respons from the natPMP device (if any). | |
string respons | |
return tuple (external_port, lifetime) or False | |
""" | |
values = unpack('>2BHI2HI', payload) | |
if values[2] != 0: | |
"""In this case, we get a status code back and can assume | |
that we are dealing with a natPMP capable gateway. | |
If the status code is anything other than 0, setting the | |
external port failed.""" | |
return False | |
# Get lifetime and port | |
lifetime = values[6] | |
external_port = values[5] | |
return external_port, lifetime | |
def determine_gateway(): | |
"""Determine the gateway of the network as it is | |
most likely here we will find a natPMP enabled device. | |
return string gatweay | |
""" | |
if os.name == 'posix': | |
if os.uname() == 'Linux': | |
default_gateway = check_output("ip route | awk '/default/ {print $3}'", shell=True).decode().strip() | |
elif os.uname() == 'Darwin': | |
default_gateway = check_output("/usr/sbin/netstat -nr | grep default | awk '{print $2}'", shell=True).decode().strip() | |
if os.name == 'nt': | |
"""Use WMI for finding the default gateway if we are on windows""" | |
import wmi | |
wmi_obj = wmi.WMI() | |
wmi_sql = "select DefaultIPGateway from Win32_NetworkAdapterConfiguration where IPEnabled=TRUE" | |
wmi_out = wmi_obj.query( wmi_sql ) | |
for dev in wmi_out: | |
default_gateway = dev.DefaultIPGateway[0] | |
return default_gateway | |
def map_external_port(lport=random.randint(1025,65535), external_port=0, timeout=7200): | |
"""Try mapping an external port to an internal port via the natPMP spec | |
This will also test if the gateway is capable of doing natPMP (timeout based), | |
and determine the default gateway. | |
It is highly recommended that the lport is provided and bound in advance, | |
as this module will not test if the port is bindable, and will not return the lport. | |
If the timeout is set to 0 the mapping will be destroid. In this case the external | |
port must also be set to 0 and from the draft, is seems that the lport must be the | |
same as in the time of creation. | |
int lport | |
int external_port | |
int timeout | |
return tuple(external_port, timeout) or False""" | |
gateway = determine_gateway() | |
stimeout = .25 | |
payload = create_payload(lport, external_port, timeout) | |
while stimeout < 6: | |
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
s.bind(('',random.randint(1025, 65535))) | |
s.settimeout(stimeout) | |
if send_payload(s, payload, gateway): | |
try: | |
rpayload = s.recvfrom(4096) | |
except socket.error as err: | |
if (err.errno and err.errno != errno.ETIMEDOUT) or str(err) != 'timed out': | |
"""For some reason, the timed out error have no errno, although the | |
errno atrribute is existing (set to None). For this reason, we get this | |
weird error handeling. It might be a bug in python3.2.3""" | |
raise err | |
s = None | |
else: | |
if rpayload[1][0] == gateway: | |
return parse_respons(rpayload[0]) | |
stimeout = stimeout * 2 | |
return False | |
if __name__ == '__main__': | |
print(map_external_port()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment