Created
July 9, 2011 05:07
-
-
Save ssokolow/1073337 to your computer and use it in GitHub Desktop.
IPv6 Updater for HE.net Tunnels, m0n0wall endpoints, and dynamic IPv4 Addresses
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
"""Simple cronscript for updating HE.net when using m0n0wall with a dynamic IP. | |
Auto-detects the default gateway address on Linux when not given. | |
Requires: | |
- Python 2.5+ | |
- LXML (Feel free to patch to use Python stdlib for parsing m0n0wall's output) | |
Installation instructions: | |
0. Download to a machine behind your router. | |
1. Put it somewhere and set it up as a cronjob. | |
3. Run --dump-config as the same user your cronjob will use. | |
4. Edit the generated state file to set passwords. | |
Certain rules apply to the state file: | |
1. Because it will also store the IP address during the last announce, it | |
must be writable. | |
2. Because it contains passwords, it and its containing folder must be owned | |
by the same user and group as the cronjob and must be chmodded 600. | |
3. When dumped, autodetectable values will be set to None. To override this | |
behaviour and freeze current values into the config, specify -d twice. | |
Yeah, the serialization is kind of silly. So sue me. I needed something quick | |
and it's flexible enough that I can fix it later with minimal hassle. | |
""" | |
__appname__ = "Dynamic IP helper for HE.net tunnel endpoints" | |
__author__ = "Stephan Sokolow (deitarion/SSokolow)" | |
__version__ = "0.1" | |
__license__ = "MIT" | |
import errno, json, logging, os, socket, stat, struct, sys, urllib2, urlparse | |
from urllib import urlencode | |
from lxml.html import fromstring as html_fromstring | |
# See http://standards.freedesktop.org/basedir-spec/basedir-spec-latest.html | |
XDG_DATA_DIR = os.environ.get('XDG_DATA_HOME', os.path.expanduser('~/.local/share')) | |
XDG_CONFIG_DIR = os.environ.get('XDG_CONFIG_HOME', os.path.expanduser('~/.config')) | |
STATE_FILE = os.path.join(XDG_CONFIG_DIR, 'upd_ipv6.json') | |
# Define the User-Agent header separately so it can be reused in other openers | |
UA_Header = ('User-Agent', '%s/%s +https://gist.github.com/1073337' % (__appname__, __version__)) | |
# Set the User-Agent string by default for all urllib2 requests | |
opener = urllib2.build_opener() | |
opener.addheaders = [UA_Header] | |
urllib2.install_opener(opener) | |
def get_default_gateway_linux(): | |
"""Read the default gateway directly from /proc.""" | |
with open("/proc/net/route") as fh: | |
for line in fh: | |
fields = line.strip().split() | |
if fields[1] != '00000000' or not int(fields[3], 16) & 2: | |
continue | |
#TODO: Find a big-endian machine to test whether "<L" should be "=L" | |
return socket.inet_ntoa(struct.pack("<L", int(fields[2], 16))) | |
class SerializedClass(object): | |
"""Interface template and basic code for self-serializing classes""" | |
SAVED_ATTRS = [] | |
def __init__(self, **kwargs): | |
"""Turn given keys in SAVED_ATTRS into attributes.""" | |
for key in self.SAVED_ATTRS: | |
if key in kwargs: | |
setattr(self, key, kwargs[key]) | |
@classmethod | |
def load(cls, state): | |
"""Instantiate the class from saved state""" | |
return cls(**state) | |
@classmethod | |
def load_simple(cls, all_state, *args, **kwargs): | |
return cls.load(all_state[cls.__name__], *args, **kwargs) | |
def save(self): | |
"""Save the class's state in a form load() will understand""" | |
#FIXME: This omits attributes that exist but have a value of None. | |
return dict((x, getattr(self, x)) for x in self.SAVED_ATTRS if getattr(self, x, None) is not None) | |
@staticmethod | |
def default_config(): | |
"""Generate this class's contribution to the example state/config file. | |
(May include values which will serialize further but fail to | |
instantiate for use as 'you must replace me before running' values) | |
""" | |
raise NotImplementedError("Must be overridden in subclasses") | |
class M0n0wallLookup(SerializedClass): | |
SAVED_ATTRS = ['user', 'password', 'use_https', 'router_ip'] | |
def __init__(self, user, password, use_https=False, router_ip=None, gateway_getter=None): | |
""" | |
@param gateway_getter: Method to call when no router IP is provided. | |
""" | |
loc = locals() | |
loc = dict((key, loc[key]) for key in loc if key in self.SAVED_ATTRS) | |
super(M0n0wallLookup, self).__init__(**loc) | |
self.gateway_getter = gateway_getter | |
#XXX: Verify that it's safe to reuse this under all circumstances | |
self.password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm() | |
self.auth_handler = urllib2.HTTPBasicAuthHandler(self.password_mgr) | |
self.url_opener = urllib2.build_opener(self.auth_handler) | |
self.url_opener.addheaders = [UA_Header] | |
@classmethod | |
def load(cls, state, gateway_getter=None): | |
instance = super(M0n0wallLookup, cls).load(state) | |
instance.gateway_getter = gateway_getter | |
return instance | |
@staticmethod | |
def default_config(): | |
return {'user': 'M0n0wall user with access to Status > Interfaces', | |
'password': 'PASSWORD', | |
'use_https': False, | |
'router_ip': None, | |
} | |
def get_wan_ipv4(self): | |
if self.router_ip: | |
router_ip = self.router_ip | |
elif self.gateway_getter: | |
router_ip = self.gateway_getter() | |
logging.debug('Retrieved default gateway IP: %s', router_ip) | |
else: | |
raise ValueError("Neither router IP nor lookup function provided.") | |
# Do all this on every lookup so the default gateway can change safely | |
url = urlparse.urlunparse(('https' if self.use_https else 'http', | |
router_ip, '/status_interfaces.php', '', '', '')) | |
# Use the default realm for the sake of robustness | |
self.password_mgr.add_password(None, url, self.user, self.password) | |
# urllib2 doesn't guarantee that read() will read to the end | |
handle = self.url_opener.open(url) | |
markup = ''.join(x for x in iter(lambda: handle.read(), '')) | |
handle.close() | |
#XXX: If anyone knows a cleaner way, please tell me | |
dom = html_fromstring(markup) | |
ipv4_title_node = [x for x in dom.findall(".//td[@class='vncellt']") if x.text.strip().lower() == 'ipv4 address'][0] | |
wan_ipv4 = ipv4_title_node.getnext().text.strip().split('/')[0] | |
logging.debug('Retrieved WAN IP from m0n0wall: %s', wan_ipv4) | |
return wan_ipv4 | |
class HETunnelBrokerPinger(SerializedClass): | |
SAVED_ATTRS = ['user', 'md5pass', 'tunnel_id', 'wan_ipv4_addr'] | |
ENDPOINT = 'https://ipv4.tunnelbroker.net/ipv4_end.php' | |
ENDPOINT_PARAMS = { | |
'ip': 'wan_ipv4_addr', | |
'pass': 'md5pass', | |
'apikey': 'user', | |
'tid': 'tunnel_id', | |
} | |
wan_ipv4_addr = None | |
def __init__(self, user, md5pass, tunnel_id, wan_ipv4_addr=None, **kwargs): | |
loc = locals() | |
loc = dict((key, loc[key]) for key in loc if key in self.SAVED_ATTRS) | |
super(HETunnelBrokerPinger, self).__init__(**loc) | |
@staticmethod | |
def default_config(): | |
return {'user': 'TunnelBroker.net username', | |
'md5pass': 'Output of hashlib.md5("your password").hexdigest()', | |
'tunnel_id': 'See TunnelBroker.net tunnel information page', | |
'wan_ipv4_addr': None, | |
} | |
def update(self, wan_ipv4_addr): | |
if self.wan_ipv4_addr and self.wan_ipv4_addr == wan_ipv4_addr: | |
return False #Only update when necessary. | |
else: | |
self.wan_ipv4_addr = wan_ipv4_addr | |
params = dict((x, getattr(self, y)) for x, y in self.ENDPOINT_PARAMS.items()) | |
url = urlparse.urljoin(self.ENDPOINT, "?" + urlencode(params)) | |
logging.debug('Generated TunnelBroker.net update url: %s', url) | |
response = urllib2.urlopen(url).read() | |
if response[0] == '+': | |
logging.debug('Received response: %s', response) | |
else: | |
logging.warning('Endpoint returned an error response: %s', response) | |
return True #TODO: Decide what to do with the +/- endpoint return value | |
def load_state(path): | |
""":todo: Use exceptions here rather than sys.exit()""" | |
if not os.path.isfile(path): | |
logging.critical("State file not found. Please use --dump-config and edit manually.") | |
sys.exit(1) | |
fstat = os.stat(path) | |
if os.name == 'posix' and not fstat.st_uid == os.getuid(): | |
logging.critical("State file owner UID does not match process UID: %s", path) | |
sys.exit(2) | |
if fstat.st_mode & (stat.S_IRWXG | stat.S_IRWXO) != 0: | |
logging.critical("State file is readable by non-owner users: %s", path) | |
sys.exit(2) | |
try: | |
with open(path, 'r') as fh: | |
return json.load(fh) | |
except IOError as e: | |
if e.errno == errno.EACCESS: | |
logging.critical("Access denied while attempting to load state data from %s", path) | |
sys.exit(2) | |
# Not a permission error. | |
logging.critical("Unknown error while attempting to load state data from %s", path) | |
sys.exit(2) | |
def save_state(path, state): | |
objs_out = [] | |
for obj in state: | |
try: | |
# Classes come first, because instances apparently have no __name__ | |
obj_pair = (obj.__name__, obj.default_config()) | |
except AttributeError: | |
obj_pair = (obj.__class__.__name__, obj.save()) | |
objs_out.append(obj_pair) | |
state_str = json.dumps(dict(objs_out), indent=True) | |
# Open after serializing to prevent serialization errors from leaving an | |
# empty state file. | |
with open(path, 'w') as fh: | |
fh.write(state_str) | |
os.chmod(path, 0600) | |
# -- Code Here -- | |
if __name__ == '__main__': | |
from optparse import OptionParser | |
parser = OptionParser(description=__doc__.split('.\n')[0], version="%%prog v%s" % __version__) | |
parser.add_option('-d', '--dump-config', action="count", | |
dest="dump_config", default=False, | |
help="Write an example config file to disk to be edited.") | |
parser.add_option('-v', '--verbose', action="count", dest="verbose", | |
default=2, help="Increase the verbosity. Can be used twice for extra effect.") | |
parser.add_option('-q', '--quiet', action="count", dest="quiet", | |
default=0, help="Decrease the verbosity. Can be used twice for extra effect.") | |
opts, args = parser.parse_args() | |
# Set up clean logging to stderr | |
log_levels = [logging.CRITICAL, logging.ERROR, logging.WARNING, logging.INFO, logging.DEBUG] | |
opts.verbose = min(opts.verbose - opts.quiet, len(log_levels) - 1) | |
opts.verbose = max(opts.verbose, 0) | |
logging.basicConfig(level=log_levels[opts.verbose], | |
format='%(asctime)s %(levelname)s %(message)s') | |
opts, args = parser.parse_args() | |
if opts.dump_config: | |
save_state(STATE_FILE, [M0n0wallLookup, HETunnelBrokerPinger]) | |
print "Default state file written to %s" % STATE_FILE | |
print "Please edit as necessary." | |
sys.exit() | |
state = load_state(STATE_FILE) | |
lookup = M0n0wallLookup.load_simple(state, get_default_gateway_linux) | |
pinger = HETunnelBrokerPinger.load_simple(state) | |
wan_ipv4 = lookup.get_wan_ipv4() | |
if pinger.wan_ipv4_addr == wan_ipv4: | |
logging.info("IPv4 Unchanged: %s", wan_ipv4) | |
sys.exit() | |
else: | |
logging.info("Updating tunnel endpoint from %s to %s", pinger.wan_ipv4_addr, wan_ipv4) | |
pinger.update(wan_ipv4) | |
save_state(STATE_FILE, [lookup, pinger]) | |
logging.debug("Updated state written to %s", STATE_FILE) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment