Skip to content

Instantly share code, notes, and snippets.

@jhass
Last active July 24, 2022 23:15
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jhass/f6a7753b52d9dc9e4354 to your computer and use it in GitHub Desktop.
Save jhass/f6a7753b52d9dc9e4354 to your computer and use it in GitHub Desktop.
powerdns remote backend for hashbang.sh
python/
*.log

Setup:

$ virtualenv -p /usr/bin/python2 python
$ source python/bin/activate
$ pip install -r requirements.txt

pdns.conf:

master=yes
# gpgsql on real deployment
launch=gsqlite3,remote
remote-connection-string=pipe:command=/path/to/pdns-hashbang.sh,timeout=2000
gsqlite3-database=/path/to/pdns.db
# Useful for dev
daemon=no
log-level=10
log-dns-details=yes
log-dns-queries=yes
local-port=5353
socket-dir=/home/....
$ sqlite3 pdns.db
# Schema from https://doc.powerdns.com/md/authoritative/backend-gsqlite/
INSERT INTO domains(name, type) VALUES ('hashbang.sh', 'NATIVE');
INSERT INTO records(domain_id, name, type, content) VALUES (1, 'hashbang.sh', 'SOA', 'ns.hashbang.sh. hostmaster.hashbang.sh. 1432461311 10800 3600 604800 1800');
INSERT INTO records(domain_id, name, type, content) VALUES (1, 'hashbang.sh', 'A', '104.131.149.82');

http://www.poweradmin.org/ to manage records from the DB in real deployment

Slave operation:

AXFR only queries the first backend so the options for slave operation are:

  • Deploy custom backend to slave, use database backend with static records as first backend
  • Use only database backend on slave and only remote backend on master, have the remote backend return the static records too.
import logging
import traceback
import pdns.remotebackend
import ldap
TTL = 300
LDAP_URI = "ldap://ldap.hashbang.sh"
CA_STORE = "/etc/ssl/certs/ca-certificates.crt"
DOMAIN = "hashbang.sh"
SOA = "ns.hashbang.sh. hostmaster.hashbang.sh. 1432461311 10800 3600 604800 1800"
logging.basicConfig(filename='pdns-hashbang.log',level=logging.DEBUG)
class HashbangHandler(pdns.remotebackend.Handler):
def __init__(self):
pdns.remotebackend.Handler.__init__(self)
self.ldapConnection = self.connect_to_ldap()
self.userCache = {}
self.hostCache = {}
def connect_to_ldap(self):
ldap.set_option(ldap.OPT_X_TLS_CACERTFILE, CA_STORE)
connection = ldap.ldapobject.ReconnectLDAPObject(LDAP_URI, retry_max=10, retry_delay=5)
connection.set_option(ldap.OPT_X_TLS_DEMAND, True)
connection.start_tls_s()
connection.simple_bind_s()
logging.info("Connected to LDAP")
return connection
def lookup_ip_for_user(self, uid):
try:
if uid in self.userCache:
host = self.userCache[uid]
else:
host = self.lookup_field_in_ldap("uid=%s,ou=People,dc=hashbang,dc=sh" % uid, "host")
self.userCache[uid] = host
logging.debug("%s is on host %s" % (uid, host))
if host in self.hostCache:
ip = self.hostCache[host]
else:
ip = self.lookup_field_in_ldap("cn=%s,ou=Servers,dc=hashbang,dc=sh" % host, "ipHostNumber")
self.hostCache[host] = ip
logging.debug("%s has the IP %s" % (host, ip))
return ip
except ldap.NO_SUCH_OBJECT:
self.log.append("No host or IP for %s" % uid)
logging.debug("No host or IP for user %s" % uid)
except (KeyboardInterrupt, SystemExit):
raise
except:
self.log.append("Failed to process request for user %s" % uid)
logging.exception("Failed to lookup ip for user %s" % uid)
def lookup_field_in_ldap(self, dn, key):
result = self.ldapConnection.search_s(dn, ldap.SCOPE_BASE, "(objectClass=*)", (key,))
return result[0][1][key][0]
def lookup_all_users(self):
result = self.ldapConnection.search_s("ou=People,dc=hashbang,dc=sh",
ldap.SCOPE_ONELEVEL, '(objectClass=*)', ("uid","host"), 0)
entries = []
for entry in result:
uid = entry[1]["uid"][0]
host = entry[1]["host"][0]
self.userCache[uid] = host
ip = self.lookup_ip_for_user(uid)
if ip:
entries.append({"uid": uid, "host": host, "ip": ip})
return entries
def do_lookup(self, args):
logging.debug("Got request for %s %s" %(args['qname'], args['qtype']))
if args["qtype"] not in ["ANY", "A", "SOA"]:
return
qname = args["qname"].split(".", 1)
if len(qname) != 2:
return
subdomain, domain = qname
if domain != DOMAIN and args["qname"] != DOMAIN:
return
if args["qtype"] == "SOA":
record = self.record_prio_ttl(DOMAIN, "SOA", SOA, 0, TTL)
else:
if subdomain == "*" or domain == "sh":
return
ip = self.lookup_ip_for_user(subdomain)
if ip:
record = self.record_prio_ttl(args['qname'], "A", ip, 0, TTL)
else:
return
self.result = [record]
logging.debug(record)
def do_list(self, args):
logging.info("Got AXFR for %s" % args["zonename"])
if args["zonename"] != DOMAIN:
return
self.result = [self.record_prio_ttl("%s.%s" % (result["uid"], DOMAIN), "A", result["ip"], 0, TTL)
for result in self.lookup_all_users()]
logging.debug("AXFR reply %s" % self.result)
if __name__ == '__main__':
try:
pdns.remotebackend.PipeConnector(HashbangHandler).run()
except:
logging.exception("Crash")
raise
#!/bin/bash
source $(dirname $(readlink -f $0))/python/bin/activate
exec python pdns-hashbang.py
python-ldap
Pdns_Remotebackend
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment