Skip to content

Instantly share code, notes, and snippets.

@Phaeilo
Created December 4, 2016 23:24
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save Phaeilo/9a176887050a9617605786d66ca03b71 to your computer and use it in GitHub Desktop.
Save Phaeilo/9a176887050a9617605786d66ca03b71 to your computer and use it in GitHub Desktop.
Bare bones Python 3 DNS server
#!/usr/bin/env python3
# This is free and unencumbered software released into the public domain.
#
# Anyone is free to copy, modify, publish, use, compile, sell, or
# distribute this software, either in source code form or as a compiled
# binary, for any purpose, commercial or non-commercial, and by any
# means.
#
# In jurisdictions that recognize copyright laws, the author or authors
# of this software dedicate any and all copyright interest in the
# software to the public domain. We make this dedication for the benefit
# of the public at large and to the detriment of our heirs and
# successors. We intend this dedication to be an overt act of
# relinquishment in perpetuity of all present and future rights to this
# software under copyright law.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
#
# For more information, please refer to <http://unlicense.org/>
"""
dns_server.py:
Bare bones Python 3 DNS server that answers queries with "no such name".
"""
__author__ = "Philip Huppert"
__license__ = "Unlicense"
__version__ = "1.0"
import os
import socket
import struct
import traceback
def parse_query(data):
header = data[:12]
payload = data[12:]
# parse DNS header
tmp = struct.unpack(">6H", header)
transaction_id = tmp[0]
flags = tmp[1]
num_queries = tmp[2]
num_answer = tmp[3]
num_authority = tmp[4]
num_additional = tmp[5]
# extract several flags
is_query = flags & 0x8000 == 0
opcode = flags & 0x7800 >> 11
is_truncated = flags & 0x0200 != 0
# verify query structure
assert num_queries > 0
assert num_answer == 0
assert num_authority == 0
assert is_query
assert opcode == 0
assert not is_truncated
# extract queries
queries = []
for i in range(num_queries):
j = payload.index(0) + 1 + 4
queries.append(payload[:j])
payload = payload[j:]
return transaction_id, queries
def build_response(transaction_id, queries):
# assemble flags
flags = 0
flags |= 0x8000 # response
flags |= 0x0400 # authorative server
flags |= 0x0003 # reply: no such name
# assembler header
header = struct.pack(">6H", transaction_id, flags, len(queries), 0, 0, 0)
# build query section
payload = b"".join(queries)
return header + payload
def get_domain(query):
# extract domain from a query
domain = []
while True:
l = query[0]
query = query[1:]
if l == 0:
break
domain.append(query[:l])
query = query[l:]
domain = [x.decode("ascii") for x in domain]
domain = ".".join(domain)
return domain
def main():
tmp = os.getenv("DNS_LISTEN", "127.0.0.1:8053")
laddr, _, lport = tmp.partition(":")
lport = int(lport)
# listen for UDP datagrams
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((laddr, lport))
while True:
# receive and parse query
data, addr = sock.recvfrom(1024)
try:
transaction_id, queries = parse_query(data)
except:
traceback.print_exc()
continue
# answer query
resp = build_response(transaction_id, queries)
sock.sendto(resp, addr)
# print queried domain names
for query in queries:
domain = get_domain(query)
print(domain)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment