Last active

Embed URL

HTTPS clone URL

SSH clone URL

You can clone with HTTPS or SSH.

Download Gist

A pure Python "ping" implementation using raw sockets.

View ping.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
#!/usr/bin/env python
 
"""
A pure Python "ping" implementation, based on a rewrite by Johannes Meyer,
of a script originally by Matthew Dixon Cowles. Which in turn was derived
from "ping.c", distributed in Linux's netkit. The version this was forked
out of can be found here: https://gist.github.com/pklaus/856268
The versions this script derived from are licensed under GPL v2, this makes
mandatory that this is licensed under the same terms as well. If it were up
to me I would have made this edit MIT licensed, sorry for not being able to.
I've rewritten nearly everything for enhanced performance and readability,
and removed unnecessary functions (assynchroneous PingQuery and related).
Those of the original comments who still applied to this script were kept.
This new revision is almost fully compliant with Python 3, and packs several
improvements including: Modular importing, better use of operators, overall
code cleanup, separated verbose mode into a dedicated function, added
generator mode to recursive function and others...
A lot was changed on my rewrite, and as far as my tests went it is working
quite beautifully. In any case, bug reports are very much welcome.
Please note that ICMP messages can only be sent by processes ran as root.
"""
 
from socket import socket, htons, AF_INET, SOCK_RAW, getprotobyname, gethostbyname, error, gaierror
from struct import pack, unpack
from random import random # Possibly switch to os.urandom()
from select import select
from time import time, sleep
from sys import version_info as py_version
 
# Remove this line to make all functions public, or selectively add needed functions to make them public.
__all__ = ["create_packet", "echo", "recursive"]
 
# Python 3 doesn't have the xrange function anymore, this line makes it compliant with both v2 and v3.
xrange = range if py_version[0] >= 3 else xrange
 
ICMP_ECHO_REQUEST = 8
ICMP_CODE = getprotobyname("icmp")
ERROR_DESCR = {1: "ICMP messages can only be sent from processes running as root.",
10013: "ICMP messages can only be sent by users or processes with administrator rights."}
 
 
def checksum(source_string):
checksum = 0
count_to = len(source_string) & -2
count = 0
while count < count_to:
this_val = ord(source_string[count + 1]) * 256 + ord(source_string[count])
checksum += this_val
checksum &= 0xffffffff # Necessary?
count += 2
if count_to < len(source_string):
checksum += ord(source_string[len(source_string) - 1])
checksum &= 0xffffffff # Necessary?
checksum = (checksum >> 16) + (checksum & 0xffff)
checksum += checksum >> 16
answer = ~checksum
answer &= 0xffff
return answer >> 8 | (answer << 8 & 0xff00)
 
 
def create_packet(id):
"""Creates a new echo request packet based on the given "id"."""
# Builds Dummy Header
# Header is type (8), code (8), checksum (16), id (16), sequence (16)
header = pack("bbHHh", ICMP_ECHO_REQUEST, 0, 0, id, 1)
data = 192 * "Q"
 
# Builds Real Header
header = pack("bbHHh", ICMP_ECHO_REQUEST, 0, htons(checksum(header + data)), id, 1)
return header + data
 
 
def response_handler(sock, packet_id, time_sent, timeout):
"""Handles packet response, returning either the delay or timing out (returns "None")."""
while True:
ready = select([sock], [], [], timeout)
if ready[0] == []: # Timeout
return
 
time_received = time()
rec_packet, addr = sock.recvfrom(1024)
icmp_header = rec_packet[20:28]
type, code, checksum, rec_id, sequence = unpack("bbHHh", icmp_header)
 
if rec_id == packet_id:
return time_received - time_sent
 
timeout -= time_received - time_sent
if timeout <= 0:
return
 
 
def echo(dest_addr, timeout=1):
"""
Sends one ICMP echo request to a given host.
"timeout" can be any integer or float except for negatives and zero.
Returns either the delay (in seconds), or "None" on timeout or an
invalid address, respectively.
"""
try:
sock = socket(AF_INET, SOCK_RAW, ICMP_CODE)
except error as err:
err_num, err_msg = err.args
if err_num in ERROR_DESCR:
raise error(ERROR_DESCR[err_num]) # Operation not permitted
else:
raise error(err_msg)
 
try:
gethostbyname(dest_addr)
except gaierror:
return
 
packet_id = int((id(timeout) * random()) % 65535)
packet = create_packet(packet_id)
while packet:
# The ICMP protocol does not use a port, but the function
# below expects it, so we just give it a dummy port.
sent = sock.sendto(packet, (dest_addr, 1))
packet = packet[sent:]
 
delay = response_handler(sock, packet_id, time(), timeout)
sock.close()
return delay
 
 
def recursive(dest_addr, count=8, timeout=1, floodlock=1, generator=False):
"""
Pings "dest_addr" "count" times and returns a list of replies or yields
values as they come up if "generator" is True.
"count" is an integer that defines the ammount of requsts to perform.
"timeout" is the number of seconds to wait before dropping request.
"floodlock" regulates the interval between calls to prevent flood
and is set in seconds.
If replied, returns echo delay in seconds. If no response is recorded
"None" is set.
"""
if generator:
for i in xrange(count):
yield echo(dest_addr, timeout)
sleep(floodlock)
else:
log = []
for i in xrange(count):
log.append(echo(dest_addr, timeout))
sleep(floodlock)
return tuple(log)
 
 
###############################################################
# CODE BELOW THIS LINE IS EXPENDABLE FOR DEVELOPMENT PURPOSES #
###############################################################
 
 
def verbose(dest_addr, count=8, timeout=1, floodlock=1, infinite=False):
"""Recursive ping with live feedback. Mind the infinity."""
host = gethostbyname(dest_addr)
print("PING {} ({}): Ammount {}; Timeout {}s".format(dest_addr, gethostbyname(dest_addr), count, timeout))
 
if infinite:
while True:
reply = echo(dest_addr, timeout)
if reply is None:
print("echo timeout... icmp_seq={}".format(seqn))
else:
print("echo from {}: icmp_seq={} delay={} ms").format(host, seqn, round(reply*1000, 3))
sleep(floodlock)
else:
log = []
fail = 0
for seqn in xrange(count):
log.append(echo(dest_addr, timeout))
if log[-1] is None:
print("echo timeout... icmp_seq={}".format(seqn))
fail += 1
else:
print("echo from {}: icmp_seq={} delay={} ms").format(host, seqn, round(log[-1]*1000, 3))
sleep(floodlock)
print("sent={} received={} ratio={}%".format(count, count-fail, (float(count-fail) * 100)/count))
print("{} / {} / {} (min/avg/max in ms)".format(round(min(log)*1000, 3),
round(sum([x*1000 for x in log if x is not None])/len(log), 3), round(max(log)*1000, 3)))
 
 
# Testing
if __name__ == "__main__":
verbose("www.heise.de", 4, 2)
print("")
verbose("google.com", 4, 2)
print("")
verbose("invalid-test-url.com", 4, 2)
print("")
verbose("127.0.0.1", 4, 2)

Since it's derivative work from https://gist.github.com/pklaus/856268 which is GPL licensed, I don't think you can distribute this on something none other than GPL. I was hoping to find a non-GPL alternative but looks like I have to write one myself.

Line 156 is wrong/broken in Python 2 as you can't return from a generator without yielding.

Consider:

-        return tuple(log)
+        yield tuple(log)
+        return

Despite having the correct loops and output in place, you're not actually changing the ICMP sequence number. You can see on line 70 that it is fixed to '1' for every packet.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.