Skip to content

Instantly share code, notes, and snippets.

@iljavs
Created July 22, 2023 02:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save iljavs/518c0ddb338a344ffce5bc67370560ea to your computer and use it in GitHub Desktop.
Save iljavs/518c0ddb338a344ffce5bc67370560ea to your computer and use it in GitHub Desktop.
import impacket.dns
import socket
import select
import sys
import multiprocessing
import signal
# DNS server to connect to
dns_server = "8.8.8.8"
# todo
def udp_pre_hook(address: tuple, data: bytes) -> bytes:
dns_packet = impacket.dns.DNS(data)
txid = dns_packet.get_transaction_id()
sport = address[1]
print ("udp request, pktlen: %d TXID: %d, source port: %d" %
(len(data), txid, sport))
return data
# todo
def udp_post_hook(data: bytes) -> bytes:
return data
# todo
def tcp_pre_hook(data: bytes) -> bytes:
return data
# todo
def tcp_post_hook(data: bytes) -> bytes:
return data
def handle_udp(data: bytes, address: tuple, udp_conn: socket.socket):
data = udp_pre_hook(address, data)
dns_server_ip = socket.gethostbyname(dns_server)
print ("Sending DNS query to %s" % dns_server_ip)
dns_server_udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
dns_server_udp_sock.sendto(data, (dns_server_ip, 53))
dns_server_udp_sock.settimeout(15)
try:
dns_response_pkt, dns_server_addr = dns_server_udp_sock.recvfrom(1024)
except socket.timeout:
print ("DNS server timeout")
dns_server_udp_sock.close()
udp_conn.close()
return
print ("Received DNS response from %s" % dns_server_addr[0])
dns_server_udp_sock.close()
dns_response_pkt = udp_post_hook(dns_response_pkt)
udp_conn.sendto(dns_response_pkt, address)
print ("Sent DNS response to client: %s" % dns_response_pkt)
udp_conn.close() # ????
print ("Closed UDP connection")
def handle_tcp(tcp_conn: socket.socket):
data = tcp_conn.recv(1024)
print ("Received TCP data from client: %s" % data)
data = tcp_pre_hook(data)
dns_server_ip = socket.gethostbyname(dns_server)
print ("Sending DNS query to %s" % dns_server_ip)
dns_server_tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
dns_server_tcp_sock.connect((dns_server_ip, 53))
dns_server_tcp_sock.send(data)
dns_response_pkt = dns_server_tcp_sock.recv(1024)
print ("Received DNS response from %s" % dns_server_ip)
dns_response_pkt = tcp_post_hook(dns_response_pkt)
tcp_conn.send(dns_response_pkt)
print ("Sent DNS response to client: %s" % dns_response_pkt)
tcp_conn.close() # ????
print ("Closed TCP connection")
def signal_handler(sig, frame):
print('You pressed Ctrl+C!')
sys.exit(0)
if __name__ == "__main__":
udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp.bind(('', 53))
tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp.bind(('', 53))
tcp.listen(1)
signal.signal(signal.SIGINT, signal_handler)
try:
while True:
rsock, wsock, esock = select.select([udp, tcp], [], [])
for sock in rsock:
if sock == udp:
data, address = udp.recvfrom(1024)
print ("UDP connection from %s" % address[0])
udp_process = multiprocessing.Process(
target=handle_udp,
args=(data, address, udp))
udp_process.start()
elif sock == tcp:
tcp_conn, tcp_addr = tcp.accept()
print ("TCP connection from %s" % tcp_addr[0])
tcp_process = multiprocessing.Process(
target=handle_tcp,
args=(tcp_conn,))
tcp_process.start()
except KeyboardInterrupt:
print("\nCtrl+C received. Exiting gracefully?.")
sys.exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment