-
Star
(132)
You must be signed in to star a gist -
Fork
(53)
You must be signed in to fork a gist
-
-
Save pklaus/856268 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python2 | |
""" | |
Other Repositories of python-ping | |
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
* https://github.com/l4m3rx/python-ping supports Python2 and Python3 | |
* https://bitbucket.org/delroth/python-ping | |
About | |
~~~~~ | |
A pure python ping implementation using raw socket. | |
Note that ICMP messages can only be sent from processes running as root. | |
Derived from ping.c distributed in Linux's netkit. That code is | |
copyright (c) 1989 by The Regents of the University of California. | |
That code is in turn derived from code written by Mike Muuss of the | |
US Army Ballistic Research Laboratory in December, 1983 and | |
placed in the public domain. They have my thanks. | |
Bugs are naturally mine. I'd be glad to hear about them. There are | |
certainly word - size dependenceies here. | |
Copyright (c) Matthew Dixon Cowles, <http://www.visi.com/~mdc/>. | |
Distributable under the terms of the GNU General Public License | |
version 2. Provided with no warranties of any sort. | |
Original Version from Matthew Dixon Cowles: | |
-> ftp://ftp.visi.com/users/mdc/ping.py | |
Rewrite by Jens Diemer: | |
-> http://www.python-forum.de/post-69122.html#69122 | |
Rewrite by Johannes Meyer: | |
-> http://www.python-forum.de/viewtopic.php?p=183720 | |
Revision history | |
~~~~~~~~~~~~~~~~ | |
November 1, 2010 | |
Rewrite by Johannes Meyer: | |
- changed entire code layout | |
- changed some comments and docstrings | |
- replaced time.clock() with time.time() in order | |
to be able to use this module on linux, too. | |
- added global __all__, ICMP_CODE and ERROR_DESCR | |
- merged functions "do_one" and "send_one_ping" | |
- placed icmp packet creation in its own function | |
- removed timestamp from the icmp packet | |
- added function "multi_ping_query" | |
- added class "PingQuery" | |
May 30, 2007 | |
little rewrite by Jens Diemer: | |
- change socket asterisk import to a normal import | |
- replace time.time() with time.clock() | |
- delete "return None" (or change to "return" only) | |
- in checksum() rename "str" to "source_string" | |
November 22, 1997 | |
Initial hack. Doesn't do much, but rather than try to guess | |
what features I (or others) will want in the future, I've only | |
put in what I need now. | |
December 16, 1997 | |
For some reason, the checksum bytes are in the wrong order when | |
this is run under Solaris 2.X for SPARC but it works right under | |
Linux x86. Since I don't know just what's wrong, I'll swap the | |
bytes always and then do an htons(). | |
December 4, 2000 | |
Changed the struct.pack() calls to pack the checksum and ID as | |
unsigned. My thanks to Jerome Poincheval for the fix. | |
""" | |
import time | |
import socket | |
import struct | |
import select | |
import random | |
import asyncore | |
# From /usr/include/linux/icmp.h; your milage may vary. | |
ICMP_ECHO_REQUEST = 8 # Seems to be the same on Solaris. | |
ICMP_CODE = socket.getprotobyname('icmp') | |
ERROR_DESCR = { | |
1: ' - Note that ICMP messages can only be ' | |
'sent from processes running as root.', | |
10013: ' - Note that ICMP messages can only be sent by' | |
' users or processes with administrator rights.' | |
} | |
__all__ = ['create_packet', 'do_one', 'verbose_ping', 'PingQuery', | |
'multi_ping_query'] | |
def checksum(source_string): | |
# I'm not too confident that this is right but testing seems to | |
# suggest that it gives the same answers as in_cksum in ping.c. | |
sum = 0 | |
count_to = (len(source_string) / 2) * 2 | |
count = 0 | |
while count < count_to: | |
this_val = ord(source_string[count + 1])*256+ord(source_string[count]) | |
sum = sum + this_val | |
sum = sum & 0xffffffff # Necessary? | |
count = count + 2 | |
if count_to < len(source_string): | |
sum = sum + ord(source_string[len(source_string) - 1]) | |
sum = sum & 0xffffffff # Necessary? | |
sum = (sum >> 16) + (sum & 0xffff) | |
sum = sum + (sum >> 16) | |
answer = ~sum | |
answer = answer & 0xffff | |
# Swap bytes. Bugger me if I know why. | |
answer = answer >> 8 | (answer << 8 & 0xff00) | |
return answer | |
def create_packet(id): | |
"""Create a new echo request packet based on the given "id".""" | |
# Header is type (8), code (8), checksum (16), id (16), sequence (16) | |
header = struct.pack('bbHHh', ICMP_ECHO_REQUEST, 0, 0, id, 1) | |
data = 192 * 'Q' | |
# Calculate the checksum on the data and the dummy header. | |
my_checksum = checksum(header + data) | |
# Now that we have the right checksum, we put that in. It's just easier | |
# to make up a new header than to stuff it into the dummy. | |
header = struct.pack('bbHHh', ICMP_ECHO_REQUEST, 0, | |
socket.htons(my_checksum), id, 1) | |
return header + data | |
def do_one(dest_addr, timeout=1): | |
""" | |
Sends one ping to the given "dest_addr" which can be an ip or hostname. | |
"timeout" can be any integer or float except negatives and zero. | |
Returns either the delay (in seconds) or None on timeout and an invalid | |
address, respectively. | |
""" | |
try: | |
my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, ICMP_CODE) | |
except socket.error as e: | |
if e.errno in ERROR_DESCR: | |
# Operation not permitted | |
raise socket.error(''.join((e.args[1], ERROR_DESCR[e.errno]))) | |
raise # raise the original error | |
try: | |
host = socket.gethostbyname(dest_addr) | |
except socket.gaierror: | |
return | |
# Maximum for an unsigned short int c object counts to 65535 so | |
# we have to sure that our packet id is not greater than that. | |
packet_id = int((id(timeout) * random.random()) % 65535) | |
packet = create_packet(packet_id) | |
while packet: | |
# The icmp protocol does not use a port, but the function | |
# below expects it, so we just give it a dummy port. | |
sent = my_socket.sendto(packet, (dest_addr, 1)) | |
packet = packet[sent:] | |
delay = receive_ping(my_socket, packet_id, time.time(), timeout) | |
my_socket.close() | |
return delay | |
def receive_ping(my_socket, packet_id, time_sent, timeout): | |
# Receive the ping from the socket. | |
time_left = timeout | |
while True: | |
started_select = time.time() | |
ready = select.select([my_socket], [], [], time_left) | |
how_long_in_select = time.time() - started_select | |
if ready[0] == []: # Timeout | |
return | |
time_received = time.time() | |
rec_packet, addr = my_socket.recvfrom(1024) | |
icmp_header = rec_packet[20:28] | |
type, code, checksum, p_id, sequence = struct.unpack( | |
'bbHHh', icmp_header) | |
if p_id == packet_id: | |
return time_received - time_sent | |
time_left -= time_received - time_sent | |
if time_left <= 0: | |
return | |
def verbose_ping(dest_addr, timeout=2, count=4): | |
""" | |
Sends one ping to the given "dest_addr" which can be an ip or hostname. | |
"timeout" can be any integer or float except negatives and zero. | |
"count" specifies how many pings will be sent. | |
Displays the result on the screen. | |
""" | |
for i in range(count): | |
print('ping {}...'.format(dest_addr)) | |
delay = do_one(dest_addr, timeout) | |
if delay == None: | |
print('failed. (Timeout within {} seconds.)'.format(timeout)) | |
else: | |
delay = round(delay * 1000.0, 4) | |
print('get ping in {} milliseconds.'.format(delay)) | |
print('') | |
class PingQuery(asyncore.dispatcher): | |
def __init__(self, host, p_id, timeout=0.5, ignore_errors=False): | |
""" | |
Derived class from "asyncore.dispatcher" for sending and | |
receiving an icmp echo request/reply. | |
Usually this class is used in conjunction with the "loop" | |
function of asyncore. | |
Once the loop is over, you can retrieve the results with | |
the "get_result" method. Assignment is possible through | |
the "get_host" method. | |
"host" represents the address under which the server can be reached. | |
"timeout" is the interval which the host gets granted for its reply. | |
"p_id" must be any unique integer or float except negatives and zeros. | |
If "ignore_errors" is True, the default behaviour of asyncore | |
will be overwritten with a function which does just nothing. | |
""" | |
asyncore.dispatcher.__init__(self) | |
try: | |
self.create_socket(socket.AF_INET, socket.SOCK_RAW, ICMP_CODE) | |
except socket.error as e: | |
if e.errno in ERROR_DESCR: | |
# Operation not permitted | |
raise socket.error(''.join((e.args[1], ERROR_DESCR[e.errno]))) | |
raise # raise the original error | |
self.time_received = 0 | |
self.time_sent = 0 | |
self.timeout = timeout | |
# Maximum for an unsigned short int c object counts to 65535 so | |
# we have to sure that our packet id is not greater than that. | |
self.packet_id = int((id(timeout) / p_id) % 65535) | |
self.host = host | |
self.packet = create_packet(self.packet_id) | |
if ignore_errors: | |
# If it does not care whether an error occured or not. | |
self.handle_error = self.do_not_handle_errors | |
self.handle_expt = self.do_not_handle_errors | |
def writable(self): | |
return self.time_sent == 0 | |
def handle_write(self): | |
self.time_sent = time.time() | |
while self.packet: | |
# The icmp protocol does not use a port, but the function | |
# below expects it, so we just give it a dummy port. | |
sent = self.sendto(self.packet, (self.host, 1)) | |
self.packet = self.packet[sent:] | |
def readable(self): | |
# As long as we did not sent anything, the channel has to be left open. | |
if (not self.writable() | |
# Once we sent something, we should periodically check if the reply | |
# timed out. | |
and self.timeout < (time.time() - self.time_sent)): | |
self.close() | |
return False | |
# If the channel should not be closed, we do not want to read something | |
# until we did not sent anything. | |
return not self.writable() | |
def handle_read(self): | |
read_time = time.time() | |
packet, addr = self.recvfrom(1024) | |
header = packet[20:28] | |
type, code, checksum, p_id, sequence = struct.unpack("bbHHh", header) | |
if p_id == self.packet_id: | |
# This comparison is necessary because winsocks do not only get | |
# the replies for their own sent packets. | |
self.time_received = read_time | |
self.close() | |
def get_result(self): | |
"""Return the ping delay if possible, otherwise None.""" | |
if self.time_received > 0: | |
return self.time_received - self.time_sent | |
def get_host(self): | |
"""Return the host where to the request has or should been sent.""" | |
return self.host | |
def do_not_handle_errors(self): | |
# Just a dummy handler to stop traceback printing, if desired. | |
pass | |
def create_socket(self, family, type, proto): | |
# Overwritten, because the original does not support the "proto" arg. | |
sock = socket.socket(family, type, proto) | |
sock.setblocking(0) | |
self.set_socket(sock) | |
# Part of the original but is not used. (at least at python 2.7) | |
# Copied for possible compatiblity reasons. | |
self.family_and_type = family, type | |
# If the following methods would not be there, we would see some very | |
# "useful" warnings from asyncore, maybe. But we do not want to, or do we? | |
def handle_connect(self): | |
pass | |
def handle_accept(self): | |
pass | |
def handle_close(self): | |
self.close() | |
def multi_ping_query(hosts, timeout=1, step=512, ignore_errors=False): | |
""" | |
Sends multiple icmp echo requests at once. | |
"hosts" is a list of ips or hostnames which should be pinged. | |
"timeout" must be given and a integer or float greater than zero. | |
"step" is the amount of sockets which should be watched at once. | |
See the docstring of "PingQuery" for the meaning of "ignore_erros". | |
""" | |
results, host_list, id = {}, [], 0 | |
for host in hosts: | |
try: | |
host_list.append(socket.gethostbyname(host)) | |
except socket.gaierror: | |
results[host] = None | |
while host_list: | |
sock_list = [] | |
for ip in host_list[:step]: # select supports only a max of 512 | |
id += 1 | |
sock_list.append(PingQuery(ip, id, timeout, ignore_errors)) | |
host_list.remove(ip) | |
# Remember to use a timeout here. The risk to get an infinite loop | |
# is high, because noone can guarantee that each host will reply! | |
asyncore.loop(timeout) | |
for sock in sock_list: | |
results[sock.get_host()] = sock.get_result() | |
return results | |
if __name__ == '__main__': | |
# Testing | |
verbose_ping('www.heise.de') | |
verbose_ping('google.com') | |
verbose_ping('an-invalid-test-url.com') | |
verbose_ping('127.0.0.1') | |
host_list = ['www.heise.de', 'google.com', '127.0.0.1', | |
'an-invalid-test-url.com'] | |
for host, ping in multi_ping_query(host_list).iteritems(): | |
print(host, '=', ping) |
I copied the checksum function and got the following error: thisVal = ord(string[count+1]) * 256 + ord(string[count]) TypeError: ord() expected string of length 1, but int found
So i changed the code to: thisVal = ord(str(string[count+1])) * 256 + ord(str(string[count])) Now I am getting an error TypeError: ord() expected a character, but string of length 2 found.
Did anybody face this? PS: I am using Python 3 (Don't know how this would affect it unless ord has become stricter in type checking)
I had this issue also, removing the "ord" keywords seems to work.
I've rewritten this script, removing unnecessary functions and variable declarations, also cleaned it up quite a lot, optimised the code and renamed some poorly named functions. It's here in case anyone is interested.
@luizberti - your gist link 404s
It's absolutely an amazing piece of work, I wrote about it too.
https://denizhalil.com/2024/04/06/sending-icmp-packets-with-python-socket-adventure-in-signaling/
Proxies protocols does not support ICMP packet, only TCP. You need a VPN.