Skip to content

Instantly share code, notes, and snippets.

@armandmcqueen
Created November 5, 2020 18:34
Show Gist options
  • Save armandmcqueen/5c615a2ed5d9d5f121c22a1c5f09f4fd to your computer and use it in GitHub Desktop.
Save armandmcqueen/5c615a2ed5d9d5f121c22a1c5f09f4fd to your computer and use it in GitHub Desktop.
import requests
import urllib
import socket
import requests.adapters
import logging
import http.client
import argparse
"""
A very simple Python traceroute(8) implementation
"""
import socket
import random
__all__ = ['Tracer']
class Tracer(object):
def __init__(self, dst, hops=30, port=None):
"""
Initializes a new tracer object
Args:
dst (str): Destination host to probe
hops (int): Max number of hops to probe
"""
self.dst = dst
self.hops = hops
self.ttl = 1
# Pick up a random port in the range 33434-33534
# self.port = random.choice(range(33434, 33535))
if port is None:
self.port = random.choice(range(33434, 33535))
else:
self.port = port
def run(self):
"""
Run the tracer
Raises:
IOError
"""
try:
dst_ip = socket.gethostbyname(self.dst)
except socket.error as e:
raise IOError('Unable to resolve {}: {}', self.dst, e)
text = 'traceroute to {} ({}), {} hops max'.format(
self.dst,
dst_ip,
self.hops
)
print(text)
while True:
receiver = self.create_receiver()
sender = self.create_sender()
sender.sendto(b'', (self.dst, self.port))
addr = None
try:
data, addr = receiver.recvfrom(1024)
except socket.error:
raise IOError('Socket error: {}'.format(e))
finally:
receiver.close()
sender.close()
if addr:
print('{:<4} {}'.format(self.ttl, addr[0]))
else:
print('{:<4} *'.format(self.ttl))
self.ttl += 1
if addr[0] == dst_ip or self.ttl > self.hops:
break
def create_receiver(self):
"""
Creates a receiver socket
Returns:
A socket instance
Raises:
IOError
"""
s = socket.socket(
family=socket.AF_INET,
type=socket.SOCK_RAW,
proto=socket.IPPROTO_ICMP
)
try:
s.bind(('', self.port))
except socket.error as e:
raise IOError('Unable to bind receiver socket: {}'.format(e))
return s
def create_sender(self):
"""
Creates a sender socket
Returns:
A socket instance
"""
s = socket.socket(
family=socket.AF_INET,
type=socket.SOCK_DGRAM,
proto=socket.IPPROTO_UDP
)
s.setsockopt(socket.SOL_IP, socket.IP_TTL, self.ttl)
return s
def main(args):
if args.action == "ping":
ping(args)
elif args.action == "trace":
trace(args)
def trace(args):
print("Tracing Route to", args.ip, "port:", args.port)
tracer = Tracer(
dst=args.ip,
hops=100,
port=args.port
)
tracer.run()
def ping(args):
if not args.endpoint.startswith("/"):
args.endpoint = '/'+args.endpoint
print("Sending and Inspecting Request")
print("requests version:", requests.__version__)
ip = args.ip + ":" + str(args.port)
url = "http://"+ip+args.endpoint
print("Sending request to", url)
logging.basicConfig(level=logging.DEBUG)
httpclient_logger = logging.getLogger("http.client")
def httpclient_logging_patch(level=logging.DEBUG):
"""Enable HTTPConnection debug logging to the logging framework"""
def httpclient_log(*args, **kwargs):
s = "|".join(args)
s += "|-|" + str(kwargs)
httpclient_logger.log(level, s)
# mask the print() built-in in the http.client module to use
# logging instead
http.client.print = httpclient_log
# enable debugging
http.client.HTTPConnection.debuglevel = 1
httpclient_logging_patch()
print("Sending GET")
response = requests.get(url)
print("Got Response:", response.text)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
"action",
choices=["ping", "trace"])
parser.add_argument(
"--ip",
help="The IP to send a request to",
type=str,
required=True
)
parser.add_argument(
"--port",
help="The port to send the request to",
default=8080,
)
parser.add_argument(
"--endpoint",
help="The endpoint to ping. Must have a leading slash",
default="/ping",
)
args, leftovers = parser.parse_known_args()
main(args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment