Skip to content

Instantly share code, notes, and snippets.

@vulpicastor
Last active Apr 26, 2017
Embed
What would you like to do?
A script to check the SOA serial numbers of all nameservers of a domain and notify via Zephyr
#!/usr/bin/env python3
from __future__ import print_function
import sys
import time
import dns.resolver, dns.exception
from random import randrange
import zwrite
DEFAULT_ARGS = {"classname": "test",
"instance": "dns",
"sender": "dns_bot",
"opcode": "auto"}
TEMPLATE = "{:15} SOA Serial: @b({})"
DEFAULT_INTERVAL = 300
BACKOFF_START = 10
def find_nameservers(domain):
nameservers = []
try:
answer = dns.resolver.query(domain, "NS")
for ns in answer:
nameservers.append(ns.target)
return nameservers
except dns.resolver.NoNameservers:
print("Error: Cannot find name servers for {}.".format(domain))
raise
def find_ips(domains):
ips = []
for d in domains:
answer = dns.resolver.query(d)
for a in answer:
ips.append(a.address)
return ips
def spawn_resolvers(nameservers):
resolvers = []
for ns in nameservers:
resolvers.append(dns.resolver.Resolver())
resolvers[-1].nameservers = [ns]
return resolvers
def serials_gt(a, b):
# TODO: actually use the algorithm specified in RFC 1982
return int(a) > int(b)
def serials_eq(a, b):
return int(a) == int(b)
def check_soa_serial(domain, resolvers, serials):
answers = []
new_serials = {}
updated = False
for r in resolvers:
n = r.nameservers[0]
try:
soa = r.query(domain, "SOA")
s = soa[0].serial
if serials_gt(s, serials[n]):
answers.append(TEMPLATE.format(n, s) + " @b(@color(green)UPDATE)")
updated = True
elif serials_eq(s, serials[n]):
if updated:
answers.append(TEMPLATE.format(n, s) + " @b(@color(yellow)UNCHANGED)")
else:
answers.append(TEMPLATE.format(n, s) + " @b(@color(red)REGRESSION)")
updated = True
new_serials[n] = s
except dns.resolver.NoNameservers:
answers.append(TEMPLATE.format(n, "@color(red)UNKNOWN"))
serials.update(new_serials)
return "\n".join(answers)
def main():
domain = sys.argv[1]
try:
resolver_ips = find_ips(find_nameservers(domain))
except (dns.exception.Timeout, dns.resolver.NXDOMAIN, dns.resolver.YXDOMAIN,
dns.resolver.NoAnswer, dns.resolver.NoNameservers) as err:
if len(sys.argv) > 2:
resolver_ips = sys.argv[2:]
else:
zwrite.zwrite(str(err), zsig=domain, **DEFAULT_ARGS)
raise
resolvers = spawn_resolvers(resolver_ips)
serials = {}
for r in resolvers:
serials[r.nameservers[0]] = "0"
wait = DEFAULT_INTERVAL
timeout = 0
while True:
try:
message = check_soa_serial(domain, resolvers, serials)
if message:
zwrite.zwrite(message, zsig=domain, **DEFAULT_ARGS)
wait = DEFAULT_INTERVAL
timeout = 0
time.sleep(wait)
except dns.exception.Timeout as err:
zwrite.zwrite(str(err) + "\nRetry in {} seconds".format(wait), zsig=domain, **DEFAULT_ARGS)
wait = randrange(0, BACKOFF_START * 2**timeout)
timeout += 1
time.sleep(wait)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment