Last active
August 11, 2017 18:34
-
-
Save ihciah/a229c21f026616b66013e556a1d07efa to your computer and use it in GitHub Desktop.
Simple Dynamic DNS Server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# __author__="ihciah" | |
# Need twisted and pyopenssl | |
from twisted.web import resource | |
from twisted.web import server as webserver | |
from twisted.internet import reactor, defer | |
from twisted.names import client, dns, error, server | |
from OpenSSL.SSL import Context, TLSv1_METHOD | |
import hmac, base64, struct, hashlib, time | |
KEY = "AABBCCDDAABBCCDD" | |
DOMAIN = "dyn.ihc.im" | |
HTTP_PORT = 23300 | |
class OTP: | |
def __init__(self, secret): | |
self.secret = secret | |
def get_hotp_token(self, intervals_no): | |
key = base64.b32decode(self.secret, True) | |
msg = struct.pack(">Q", intervals_no) | |
h = hmac.new(key, msg, hashlib.sha1).digest() | |
o = ord(h[19]) & 15 | |
h = (struct.unpack(">I", h[o:o+4])[0] & 0x7fffffff) % 1000000 | |
return str(h).zfill(6) | |
def get_totp_token(self, t=time.time()): | |
return self.get_hotp_token(int(t)//30) | |
def validate(self, authcode): | |
valid_keys = [self.get_totp_token(time.time() + t) for t in (-30, 0, 30)] | |
return reduce(lambda x, k: x or k == authcode, valid_keys, False) | |
class HTTPServer(resource.Resource): | |
isLeaf = True | |
def __init__(self, resolver): | |
self.resolver = resolver | |
def validate(self, authcode): | |
otp = OTP(KEY) | |
return otp.validate(authcode) | |
def render_GET(self, request): | |
try: | |
if request.uri == '/ddns': | |
auth = request.getHeader('Auth') | |
ip = request.getHeader('IP') | |
if auth and ip and self.validate(auth): | |
self.resolver.myip = str(ip) | |
return "OK!" | |
request.setResponseCode(403) | |
return "403 Forbidden" | |
except: | |
pass | |
class DNSResolver(object): | |
def __init__(self): | |
self.mydomain = DOMAIN | |
self.myip = "10.10.10.10" | |
def _CheckQuery(self, query): | |
if query.type == dns.A: | |
return query.name.name == self.mydomain | |
return False | |
def _Response(self, query): | |
name = query.name.name | |
answer = dns.RRHeader( | |
name=name, | |
payload=dns.Record_A(b'%s' % self.myip, 0)) | |
answers = [answer] | |
authority = [] | |
additional = [] | |
return answers, authority, additional | |
def query(self, query, timeout=None): | |
try: | |
if self._CheckQuery(query): | |
return defer.succeed(self._Response(query)) | |
else: | |
return defer.fail(error.DomainError()) | |
except: | |
pass | |
class ContextFactory(): | |
def __init__(self, context): | |
self.context = context | |
def getContext(self): | |
return self.context | |
def main(): | |
cert = "/root/crt" | |
key = "/root/key" | |
resolver = DNSResolver() | |
factory = server.DNSServerFactory( | |
clients=[resolver] | |
) | |
protocol = dns.DNSDatagramProtocol(controller=factory) | |
httpserver = webserver.Site(HTTPServer(resolver)) | |
context = Context(TLSv1_METHOD) | |
context.use_certificate_chain_file(cert) | |
context.use_privatekey_file(key) | |
reactor.listenUDP(53, protocol) | |
reactor.listenSSL(HTTP_PORT, httpserver, ContextFactory(context)) | |
reactor.run() | |
if __name__ == '__main__': | |
try: | |
main() | |
except: | |
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import hmac, base64, struct, hashlib, time | |
import requests, sys | |
KEY = "AABBCCDDAABBCCDD" | |
SERVER = "https://ddns.ihc.im:23300/ddns" | |
class OTP: | |
def __init__(self, secret): | |
self.secret = secret | |
def get_hotp_token(self, intervals_no): | |
key = base64.b32decode(self.secret, True) | |
msg = struct.pack(">Q", intervals_no) | |
h = hmac.new(key, msg, hashlib.sha1).digest() | |
o = ord(h[19]) & 15 | |
h = (struct.unpack(">I", h[o:o+4])[0] & 0x7fffffff) % 1000000 | |
return str(h).zfill(6) | |
def get_totp_token(self, t=time.time()): | |
return self.get_hotp_token(int(t)//30) | |
def main(): | |
if len(sys.argv) != 2: | |
print "Usage: %s IP_ADDRESS" % sys.argv[0] | |
return | |
req = requests.get(SERVER, headers={"Auth": OTP(KEY).get_totp_token(), "IP": sys.argv[1]}) | |
print req.content | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
A new version is updated to https://github.com/ihciah/simple-ddns