Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
IPv6 Updater for HE.net Tunnels, m0n0wall endpoints, and dynamic IPv4 Addresses
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Simple cronscript for updating HE.net when using m0n0wall with a dynamic IP.
Auto-detects the default gateway address on Linux when not given.
Requires:
- Python 2.5+
- LXML (Feel free to patch to use Python stdlib for parsing m0n0wall's output)
Installation instructions:
0. Download to a machine behind your router.
1. Put it somewhere and set it up as a cronjob.
3. Run --dump-config as the same user your cronjob will use.
4. Edit the generated state file to set passwords.
Certain rules apply to the state file:
1. Because it will also store the IP address during the last announce, it
must be writable.
2. Because it contains passwords, it and its containing folder must be owned
by the same user and group as the cronjob and must be chmodded 600.
3. When dumped, autodetectable values will be set to None. To override this
behaviour and freeze current values into the config, specify -d twice.
Yeah, the serialization is kind of silly. So sue me. I needed something quick
and it's flexible enough that I can fix it later with minimal hassle.
"""
__appname__ = "Dynamic IP helper for HE.net tunnel endpoints"
__author__ = "Stephan Sokolow (deitarion/SSokolow)"
__version__ = "0.1"
__license__ = "MIT"
import errno, json, logging, os, socket, stat, struct, sys, urllib2, urlparse
from urllib import urlencode
from lxml.html import fromstring as html_fromstring
# See http://standards.freedesktop.org/basedir-spec/basedir-spec-latest.html
XDG_DATA_DIR = os.environ.get('XDG_DATA_HOME', os.path.expanduser('~/.local/share'))
XDG_CONFIG_DIR = os.environ.get('XDG_CONFIG_HOME', os.path.expanduser('~/.config'))
STATE_FILE = os.path.join(XDG_CONFIG_DIR, 'upd_ipv6.json')
# Define the User-Agent header separately so it can be reused in other openers
UA_Header = ('User-Agent', '%s/%s +https://gist.github.com/1073337' % (__appname__, __version__))
# Set the User-Agent string by default for all urllib2 requests
opener = urllib2.build_opener()
opener.addheaders = [UA_Header]
urllib2.install_opener(opener)
def get_default_gateway_linux():
"""Read the default gateway directly from /proc."""
with open("/proc/net/route") as fh:
for line in fh:
fields = line.strip().split()
if fields[1] != '00000000' or not int(fields[3], 16) & 2:
continue
#TODO: Find a big-endian machine to test whether "<L" should be "=L"
return socket.inet_ntoa(struct.pack("<L", int(fields[2], 16)))
class SerializedClass(object):
"""Interface template and basic code for self-serializing classes"""
SAVED_ATTRS = []
def __init__(self, **kwargs):
"""Turn given keys in SAVED_ATTRS into attributes."""
for key in self.SAVED_ATTRS:
if key in kwargs:
setattr(self, key, kwargs[key])
@classmethod
def load(cls, state):
"""Instantiate the class from saved state"""
return cls(**state)
@classmethod
def load_simple(cls, all_state, *args, **kwargs):
return cls.load(all_state[cls.__name__], *args, **kwargs)
def save(self):
"""Save the class's state in a form load() will understand"""
#FIXME: This omits attributes that exist but have a value of None.
return dict((x, getattr(self, x)) for x in self.SAVED_ATTRS if getattr(self, x, None) is not None)
@staticmethod
def default_config():
"""Generate this class's contribution to the example state/config file.
(May include values which will serialize further but fail to
instantiate for use as 'you must replace me before running' values)
"""
raise NotImplementedError("Must be overridden in subclasses")
class M0n0wallLookup(SerializedClass):
SAVED_ATTRS = ['user', 'password', 'use_https', 'router_ip']
def __init__(self, user, password, use_https=False, router_ip=None, gateway_getter=None):
"""
@param gateway_getter: Method to call when no router IP is provided.
"""
loc = locals()
loc = dict((key, loc[key]) for key in loc if key in self.SAVED_ATTRS)
super(M0n0wallLookup, self).__init__(**loc)
self.gateway_getter = gateway_getter
#XXX: Verify that it's safe to reuse this under all circumstances
self.password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
self.auth_handler = urllib2.HTTPBasicAuthHandler(self.password_mgr)
self.url_opener = urllib2.build_opener(self.auth_handler)
self.url_opener.addheaders = [UA_Header]
@classmethod
def load(cls, state, gateway_getter=None):
instance = super(M0n0wallLookup, cls).load(state)
instance.gateway_getter = gateway_getter
return instance
@staticmethod
def default_config():
return {'user': 'M0n0wall user with access to Status > Interfaces',
'password': 'PASSWORD',
'use_https': False,
'router_ip': None,
}
def get_wan_ipv4(self):
if self.router_ip:
router_ip = self.router_ip
elif self.gateway_getter:
router_ip = self.gateway_getter()
logging.debug('Retrieved default gateway IP: %s', router_ip)
else:
raise ValueError("Neither router IP nor lookup function provided.")
# Do all this on every lookup so the default gateway can change safely
url = urlparse.urlunparse(('https' if self.use_https else 'http',
router_ip, '/status_interfaces.php', '', '', ''))
# Use the default realm for the sake of robustness
self.password_mgr.add_password(None, url, self.user, self.password)
# urllib2 doesn't guarantee that read() will read to the end
handle = self.url_opener.open(url)
markup = ''.join(x for x in iter(lambda: handle.read(), ''))
handle.close()
#XXX: If anyone knows a cleaner way, please tell me
dom = html_fromstring(markup)
ipv4_title_node = [x for x in dom.findall(".//td[@class='vncellt']") if x.text.strip().lower() == 'ipv4 address'][0]
wan_ipv4 = ipv4_title_node.getnext().text.strip().split('/')[0]
logging.debug('Retrieved WAN IP from m0n0wall: %s', wan_ipv4)
return wan_ipv4
class HETunnelBrokerPinger(SerializedClass):
SAVED_ATTRS = ['user', 'md5pass', 'tunnel_id', 'wan_ipv4_addr']
ENDPOINT = 'https://ipv4.tunnelbroker.net/ipv4_end.php'
ENDPOINT_PARAMS = {
'ip': 'wan_ipv4_addr',
'pass': 'md5pass',
'apikey': 'user',
'tid': 'tunnel_id',
}
wan_ipv4_addr = None
def __init__(self, user, md5pass, tunnel_id, wan_ipv4_addr=None, **kwargs):
loc = locals()
loc = dict((key, loc[key]) for key in loc if key in self.SAVED_ATTRS)
super(HETunnelBrokerPinger, self).__init__(**loc)
@staticmethod
def default_config():
return {'user': 'TunnelBroker.net username',
'md5pass': 'Output of hashlib.md5("your password").hexdigest()',
'tunnel_id': 'See TunnelBroker.net tunnel information page',
'wan_ipv4_addr': None,
}
def update(self, wan_ipv4_addr):
if self.wan_ipv4_addr and self.wan_ipv4_addr == wan_ipv4_addr:
return False #Only update when necessary.
else:
self.wan_ipv4_addr = wan_ipv4_addr
params = dict((x, getattr(self, y)) for x, y in self.ENDPOINT_PARAMS.items())
url = urlparse.urljoin(self.ENDPOINT, "?" + urlencode(params))
logging.debug('Generated TunnelBroker.net update url: %s', url)
response = urllib2.urlopen(url).read()
if response[0] == '+':
logging.debug('Received response: %s', response)
else:
logging.warning('Endpoint returned an error response: %s', response)
return True #TODO: Decide what to do with the +/- endpoint return value
def load_state(path):
""":todo: Use exceptions here rather than sys.exit()"""
if not os.path.isfile(path):
logging.critical("State file not found. Please use --dump-config and edit manually.")
sys.exit(1)
fstat = os.stat(path)
if os.name == 'posix' and not fstat.st_uid == os.getuid():
logging.critical("State file owner UID does not match process UID: %s", path)
sys.exit(2)
if fstat.st_mode & (stat.S_IRWXG | stat.S_IRWXO) != 0:
logging.critical("State file is readable by non-owner users: %s", path)
sys.exit(2)
try:
with open(path, 'r') as fh:
return json.load(fh)
except IOError as e:
if e.errno == errno.EACCESS:
logging.critical("Access denied while attempting to load state data from %s", path)
sys.exit(2)
# Not a permission error.
logging.critical("Unknown error while attempting to load state data from %s", path)
sys.exit(2)
def save_state(path, state):
objs_out = []
for obj in state:
try:
# Classes come first, because instances apparently have no __name__
obj_pair = (obj.__name__, obj.default_config())
except AttributeError:
obj_pair = (obj.__class__.__name__, obj.save())
objs_out.append(obj_pair)
state_str = json.dumps(dict(objs_out), indent=True)
# Open after serializing to prevent serialization errors from leaving an
# empty state file.
with open(path, 'w') as fh:
fh.write(state_str)
os.chmod(path, 0600)
# -- Code Here --
if __name__ == '__main__':
from optparse import OptionParser
parser = OptionParser(description=__doc__.split('.\n')[0], version="%%prog v%s" % __version__)
parser.add_option('-d', '--dump-config', action="count",
dest="dump_config", default=False,
help="Write an example config file to disk to be edited.")
parser.add_option('-v', '--verbose', action="count", dest="verbose",
default=2, help="Increase the verbosity. Can be used twice for extra effect.")
parser.add_option('-q', '--quiet', action="count", dest="quiet",
default=0, help="Decrease the verbosity. Can be used twice for extra effect.")
opts, args = parser.parse_args()
# Set up clean logging to stderr
log_levels = [logging.CRITICAL, logging.ERROR, logging.WARNING, logging.INFO, logging.DEBUG]
opts.verbose = min(opts.verbose - opts.quiet, len(log_levels) - 1)
opts.verbose = max(opts.verbose, 0)
logging.basicConfig(level=log_levels[opts.verbose],
format='%(asctime)s %(levelname)s %(message)s')
opts, args = parser.parse_args()
if opts.dump_config:
save_state(STATE_FILE, [M0n0wallLookup, HETunnelBrokerPinger])
print "Default state file written to %s" % STATE_FILE
print "Please edit as necessary."
sys.exit()
state = load_state(STATE_FILE)
lookup = M0n0wallLookup.load_simple(state, get_default_gateway_linux)
pinger = HETunnelBrokerPinger.load_simple(state)
wan_ipv4 = lookup.get_wan_ipv4()
if pinger.wan_ipv4_addr == wan_ipv4:
logging.info("IPv4 Unchanged: %s", wan_ipv4)
sys.exit()
else:
logging.info("Updating tunnel endpoint from %s to %s", pinger.wan_ipv4_addr, wan_ipv4)
pinger.update(wan_ipv4)
save_state(STATE_FILE, [lookup, pinger])
logging.debug("Updated state written to %s", STATE_FILE)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.