Skip to content

Instantly share code, notes, and snippets.

@vpag
Last active May 8, 2021 21:34
Show Gist options
  • Save vpag/46473146849e1cf28ea1d1b1d5f8029a to your computer and use it in GitHub Desktop.
Save vpag/46473146849e1cf28ea1d1b1d5f8029a to your computer and use it in GitHub Desktop.
Unbound python demo module, add route on DNS reply
"""Unbound python demo module, add route on DNS reply.
Related article: https://vp.ag/posts/2021-04-15-dns-based-routing/
Reference: https://nlnetlabs.nl/documentation/unbound/pythonmod/
"""
def init(id, cfg):
return True
def deinit(id):
return True
def inform_super(id, qstate, superqstate, qdata):
return True
def add_route(qstate):
def rr_unpack(rr_raw, rtype='RAW'):
## Ref: https://nlnetlabs.nl/documentation/unbound/doxygen/structpacked__rrset__data.html
rlen = int.from_bytes(rr_raw[:2], byteorder='big')
return {
'IP': lambda b: '.'.join(map(str, b)) if rlen == 4 else b,
'RAW': lambda b: b
}.get(rtype, 'RAW')(rr_raw[2:rlen+2])
if qstate.return_msg.rep:
for i in range(qstate.return_msg.rep.rrset_count):
# rrs = qstate.return_msg.rep.rrsets[i]
rrs_k = qstate.return_msg.rep.rrsets[i].rk
rrs_d = qstate.return_msg.rep.rrsets[i].entry.data
# log_info('{}\t {}, flags: {:04X}'.format(
# i, rrs_k.dname_str, rrs_k.flags))
# log_info('\t type: {} ({}) class: {} ({})'.format(
# rrs_k.type_str, ntohs(rrs_k.type), rrs_k.rrset_class_str, ntohs(rrs_k.rrset_class)))
for j in range(rrs_d.count):
rrl = rrs_d.rr_len[j]
rrd = rrs_d.rr_data[j][0:rrl]
# log_info('\t {:3}: {:3} {}'.format(j, rrl, rr_unpack(rrd, 'IP' if rrs_k.rrset_class_str == 'A' else 'RAW')))
if (rrs_k.rrset_class_str, rrs_k.type_str) == ('IN', 'A'):
log_info('# ip.route("add", dst="{}/32", gateway="...") # e.g. pyroute2'.format(rr_unpack(rrd, 'IP')))
def operate(id, event, qstate, qdata):
# log_info("---------\npythonmod: operate called, id: %d, event:%s" % (id, strmodulevent(event)))
if (event == MODULE_EVENT_NEW) or (event == MODULE_EVENT_PASS):
## Pass the query to validator
qstate.ext_state[id] = MODULE_WAIT_MODULE
return True
if event == MODULE_EVENT_MODDONE:
# log_info("pythonmod: iterator module done")
if not qstate.return_msg:
qstate.ext_state[id] = MODULE_FINISHED
return True
## Process the response
qdn = qstate.qinfo.qname_str
if any(qdn.endswith('linkedin.com.'),
'microsoft' in qdn,
False and 'Other query-based condition(s)'):
add_route(qstate)
qstate.ext_state[id] = MODULE_FINISHED
return True
log_err("pythonmod: bad event")
qstate.ext_state[id] = MODULE_ERROR
return True
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment