Skip to content

Instantly share code, notes, and snippets.

@lotabout
Last active December 24, 2023 06:08
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save lotabout/82186f697b49718b5b33c62878431f03 to your computer and use it in GitHub Desktop.
Save lotabout/82186f697b49718b5b33c62878431f03 to your computer and use it in GitHub Desktop.
Simple ping implementation in python3 for practicing TCP/IP
#!/usr/bin/env python3
import os
import struct
import socket
import time
def checksum(bytestr):
# ref
# - https://en.wikipedia.org/wiki/IPv4_header_checksum
# - https://gist.github.com/pklaus/856268/b7194182270c816dee69438b54e42116ab31e53b
sum = 0
count_to = (len(bytestr)//2)*2
count = 0
while count < count_to:
this_val = (bytestr[count+1] << 8) + bytestr[count]
sum += this_val
sum &= 0xFFFFFFFF
count += 2
# last (odd) byte
if count_to < len(bytestr):
sum += bytestr[len(bytestr) -1]
sum &= 0xFFFFFFFF
sum = (sum >> 16) + (sum & 0xFFFF)
sum = sum + (sum >> 16)
answer = ~sum
answer = answer & 0xffff
# Swap bytes. Bugger me if I know why.
answer = answer >> 8 | (answer << 8 & 0xff00)
return answer
def echo_request(packet_id, sequence=1, data_size=56):
"""create a new echo request package given id
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Type | Code | Checksum |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Identifier | Sequence Number |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Data ...
+-+-+-+-+-
"""
ICMP_ECHO = 8
# header is type(8), code(8), checksum(16), id(16), sequence(16)
header = struct.pack('!BBHHH', ICMP_ECHO, 0, 0, packet_id, sequence)
# pad random bytes
# - https://stackoverflow.com/a/34616738
padBytes = []
startVal = 0x42
for i in range(startVal, startVal + data_size):
padBytes += [(i & 0xff)] # Keep chars in the 0-255 range
data = bytes(padBytes)
my_checksum = checksum(header + data)
header = struct.pack('!BBHHH', ICMP_ECHO, 0, my_checksum, packet_id, sequence)
return header + data
def receive_ping(sock, packet_id, time_sent, timeout):
import select
time_left = timeout
while True:
select_start = time.time()
ready = select.select([sock], [], [], time_left)
how_long_in_select = time.time() - select_start
if len(ready[0]) == 0: # timeout
return None
time_received = time.time()
rec_packet, addr = sock.recvfrom(1024)
icmp_header = rec_packet[20:28]
data_size = 8 + len(rec_packet[28:])
type, code, checksum, p_id, seq = struct.unpack('!BBHHH', icmp_header)
if p_id == packet_id:
ttl = int(rec_packet[8])
return (ttl, time_received - time_sent, data_size)
# receive other package
time_left -= time_received - time_sent
if time_left <= 0:
return None
def send_request(sock, ip_addr, packet):
while packet:
sent = my_sock.sendto(packet, (ip_addr, 1))
packet = packet[sent:]
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Ping')
parser.add_argument('-s', '--packetsize', help='packetsize', type=int, default='56')
parser.add_argument('-t', '--timeout', help='timeout seconds for a single request', type=int, default='1')
parser.add_argument('host', help='host')
args = parser.parse_args()
timeout = args.timeout
my_sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.getprotobyname('icmp'))
ip_addr = socket.gethostbyname(args.host)
packet_id = os.getpid() % 65536
print(f'PING {args.host} ({ip_addr}): {args.packetsize} data bytes')
for seq in range(1, 11):
packet = echo_request(packet_id, seq, data_size=args.packetsize)
send_request(my_sock, ip_addr, packet)
ping_response = receive_ping(my_sock, packet_id, time.time(), timeout)
if ping_response == None:
print(f'failed. Timeout within {timeout} seconds')
else:
ttl, delay, payload_bytes = ping_response
delay = round(delay * 1000.0, 4)
print(f'{payload_bytes} bytes from {ip_addr}: icmp_seq={seq} ttl={ttl} time={delay} ms')
time.sleep(1)
my_sock.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment