Skip to content

Instantly share code, notes, and snippets.

@bmcculley
Created June 26, 2019 04:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bmcculley/cc3f6f536fa780547f1f588ff9b2d801 to your computer and use it in GitHub Desktop.
Save bmcculley/cc3f6f536fa780547f1f588ff9b2d801 to your computer and use it in GitHub Desktop.
An example DNS server using twisted.
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
An example demonstrating how to create a custom DNS server.
The server will calculate the responses to A queries where the name begins with
the word "workstation".
Other queries will be handled by a fallback resolver.
eg
python doc/names/howto/listings/names/override_server.py
$ dig -p 10053 @localhost workstation1.example.com A +short
172.0.2.1
"""
from twisted.internet import reactor, defer
from twisted.names import client, dns, error, server
import sys
class DynamicResolver(object):
"""
A resolver which calculates the answers to certain queries based on the
query type and name.
"""
_pattern = 'workstation'
_network = '172.0.2'
def _dynamicResponseRequired(self, query):
"""
Check the query to determine if a dynamic response is required.
A:1
AAAA:28
CNAME:5
MX:15
"""
print(query.type)
if query.type == dns.A:
labels = query.name.name.decode().split('.')
if labels[0].startswith(self._pattern):
return True
return True
def _doDynamicResponse(self, query):
"""
Calculate the response to a query.
"""
answers = []
name = query.name.name
labels = name.decode().split('.')
parts = labels[0].split(self._pattern)
lastOctet = int(parts[1])
# a record
if query.type == dns.A:
answers.append( dns.RRHeader(
name=name,
payload=dns.Record_A(address='%s.%s' % (self._network, lastOctet))) )
# mx record
if query.type == dns.MX:
answers.append( dns.RRHeader(name, dns.MX, dns.IN, 1200,
dns.Record_MX( preference=10,
name='mail.example.com', ttl=3600)) )
# cname record
if query.type == dns.CNAME:
answers.append( dns.RRHeader(name, dns.CNAME, dns.IN, 0,
dns.Record_CNAME('example.com', 4321)))
authority = []
additional = []
return answers, authority, additional
def query(self, query, timeout=None):
"""
Check if the query should be answered dynamically, otherwise dispatch to
the fallback resolver.
"""
if self._dynamicResponseRequired(query):
return defer.succeed(self._doDynamicResponse(query))
else:
return defer.fail(error.DomainError())
def main():
"""
Run the server.
"""
factory = server.DNSServerFactory(
clients=[DynamicResolver(), client.Resolver(resolv='/etc/resolv.conf')]
)
protocol = dns.DNSDatagramProtocol(controller=factory)
reactor.listenUDP(10053, protocol, interface="127.0.0.1")
reactor.listenTCP(10053, factory, interface="127.0.0.1")
reactor.run()
if __name__ == '__main__':
raise SystemExit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment