Skip to content

Instantly share code, notes, and snippets.

@lotabout
Created March 13, 2021 07:13
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save lotabout/6c8bdadc485a2445cc613849135cb898 to your computer and use it in GitHub Desktop.
Save lotabout/6c8bdadc485a2445cc613849135cb898 to your computer and use it in GitHub Desktop.
Simple traceroute implementation in Python3
#!/usr/bin/env python3
import struct
import socket
import time
# Need to run with root permission cause RAW socket is used
# ref:
# - https://dnaeon.github.io/traceroute-in-python/
# - https://github.com/openbsd/src/blob/master/usr.sbin/traceroute/traceroute.c
def create_sender(ttl):
"""A socket for sending UDP packet (with TTL set)"""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# use socket option cause we don't want to construct packet by ourselves
sock.setsockopt(socket.SOL_IP, socket.IP_TTL, ttl)
return sock
def create_receiver(timeout=1):
"""Create a socket for receiving ICMP response packet"""
sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.getprotobyname('icmp'))
sock.settimeout(timeout)
return sock
def is_ttl(icmp_packet):
# IP header(20B), the first byte of ICMP is type and 11 is TTL
return icmp_packet[20] == 11
def is_destination_unreacheable(icmp_packet):
# IP header(20B), the first byte of ICMP is type and 3 is TTL
return icmp_packet[20] == 3
def extract_port(icmp_packet):
# IP header(20), ICMP header(8B), orig IP header(20B) and source port(2B)
target_port = icmp_packet[50:52]
return struct.unpack('!H', target_port)[0]
def udp_ports():
while True:
for port in range(33434, 33535):
yield port
def wait_for_reply(icmp_sock, expected_port, timeout=1):
"""wait for ICMP reply and return the source ip address"""
start_time = time.time()
while True:
if time.time() - start_time > timeout:
raise socket.timeout
rec_packet, addr = icmp_sock.recvfrom(1024)
if not is_ttl(rec_packet) and not is_destination_unreacheable(rec_packet):
continue
if extract_port(rec_packet) != expected_port:
continue
return addr[0]
def trace_once(ip_addr, ttl, ports=udp_ports()):
"""Send a request and return (ip, time) or None on timeout"""
sender = create_sender(ttl)
receiver = create_receiver()
try:
next_port = next(ports)
start_time = time.time()
sender.sendto(b'', (ip_addr, next_port))
addr = wait_for_reply(receiver, next_port)
end_time = time.time()
return (addr, end_time - start_time)
finally:
if sender is not None:
sender.close()
if receiver is not None:
receiver.close()
def format_delay(delay):
if delay == None:
return '*'
return f'{delay * 1000:.3f}ms'
def trace(ip_addr, ttl, num_req=3):
"""send multiple trace requests and return their results"""
traces = []
for _i in range(num_req):
try:
addr, delay = trace_once(ip_addr, ttl)
traces.append((addr, format_delay(delay)))
except socket.timeout:
traces.append(('*', format_delay(None)))
return traces
from collections import defaultdict
def print_traces(hop, traces):
# print the traces
merged = defaultdict(list)
for addr, delay in traces:
merged[addr].append(delay)
for idx, (addr, delays) in enumerate(merged.items()):
prefix = f'{hop:2}' if idx == 0 else ' '
print(f'{prefix} {addr:15} {" ".join(delays)}')
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='traceroute')
parser.add_argument('host', help='host')
args = parser.parse_args()
ip_addr = socket.gethostbyname(args.host)
print(f"traceroute to {args.host}({ip_addr})")
for hop in range(1, 64):
traces = trace(ip_addr, hop)
print_traces(hop, traces)
if traces[0][0] == ip_addr:
break
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment