Last active
September 23, 2022 11:40
-
-
Save Laurent-Andrieu/413ad8ab3c1ff4f140d8228e32c4a26e to your computer and use it in GitHub Desktop.
Ping avec affichage compteur dynamique
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import os, sys, socket, struct, select, time | |
default_timer = time.process_time | |
ICMP_ECHO_REQUEST = 8 | |
total = 0 | |
invalide = 0 | |
valide = 0 | |
erreur = 0 | |
tps_max = 0 | |
tps_min = 0 | |
vals_moy = {} | |
tps_moy = 0 | |
def checksum(source_string): | |
sum = 0 | |
countTo = (len(source_string) / 2) * 2 | |
count = 0 | |
while count < countTo: | |
thisVal = source_string[count + 1] * 256 + source_string[count] | |
sum = sum + thisVal | |
sum = sum & 0xffffffff # Necessary? | |
count = count + 2 | |
if countTo < len(source_string): | |
sum = sum + source_string[len(source_string) - 1] | |
sum = sum & 0xffffffff # Necessary? | |
sum = (sum >> 16) + (sum & 0xffff) | |
sum = sum + (sum >> 16) | |
answer = ~sum | |
answer = answer & 0xffff | |
# Swap bytes. Bugger me if I know why. | |
answer = answer >> 8 | (answer << 8 & 0xff00) | |
return answer | |
def receive_one_ping(my_socket, ID, timeout): | |
""" | |
receive the ping from the socket. | |
""" | |
timeLeft = timeout | |
while True: | |
startedSelect = default_timer() | |
whatReady = select.select([my_socket], [], [], timeLeft) | |
howLongInSelect = (default_timer() - startedSelect) | |
if not whatReady[0]: # Timeout | |
return | |
timeReceived = default_timer() | |
recPacket, addr = my_socket.recvfrom(1024) | |
icmpHeader = recPacket[20:28] | |
type, code, checksum, packetID, sequence = struct.unpack( | |
"bbHHh", icmpHeader | |
) | |
# Filters out the echo request itself. | |
# This can be tested by pinging 127.0.0.1 | |
# You'll see your own request | |
if type != 8 and packetID == ID: | |
bytesInDouble = struct.calcsize("d") | |
timeSent = struct.unpack("d", recPacket[28:28 + bytesInDouble])[0] | |
return timeReceived - timeSent | |
timeLeft = timeLeft - howLongInSelect | |
if timeLeft <= 0: | |
return | |
def send_one_ping(my_socket, dest_addr, ID): | |
""" | |
Send one ping to the given >dest_addr<. | |
""" | |
dest_addr = socket.gethostbyname(dest_addr) | |
# Header is type (8), code (8), checksum (16), id (16), sequence (16) | |
my_checksum = 0 | |
# Make a dummy header with a 0 checksum. | |
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, my_checksum, ID, 1) | |
bytesInDouble = struct.calcsize("d") | |
data = bytes((192 - bytesInDouble) * "Q", 'utf-8') | |
data = struct.pack("d", default_timer()) + data | |
# Calculate the checksum on the data and the dummy header. | |
my_checksum = checksum(header + data) | |
# Now that we have the right checksum, we put that in. It's just easier | |
# to make up a new header than to stuff it into the dummy. | |
header = struct.pack( | |
"bbHHh", ICMP_ECHO_REQUEST, 0, socket.htons(my_checksum), ID, 1 | |
) | |
packet = header + data | |
my_socket.sendto(packet, (dest_addr, 1)) # Don't know about the 1 | |
def do_one(dest_addr, timeout): | |
""" | |
Returns either the delay (in seconds) or none on timeout. | |
""" | |
icmp = socket.getprotobyname("icmp") | |
try: | |
my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp) | |
except PermissionError as e: | |
e.args = (e.args if e.args else tuple()) + (( | |
" - Note that ICMP messages can only be sent from processes" | |
" running as root." | |
),) | |
raise | |
my_ID = os.getpid() & 0xFFFF | |
send_one_ping(my_socket, dest_addr, my_ID) | |
delay = receive_one_ping(my_socket, my_ID, timeout) | |
my_socket.close() | |
return delay | |
def write_log(file, content): | |
try: | |
with open("{}.txt".format(file), "a") as f: | |
f.write(content) | |
f.close() | |
except IOError as e: | |
print(e) | |
def verbose_ping(dest_addr, timeout=3): | |
global invalide, valide, erreur, total, tps_min, tps_max, tps_moy | |
date1 = datetime.datetime.now().strftime("%x %X") | |
date = datetime.datetime.now().strftime("%x") | |
print("\rPing vers {} | Essai en cours depuis le {}. Timeout: {}s". | |
format(dest_addr, date1, timeout)) | |
while True: | |
try: | |
delay = do_one(dest_addr, timeout) | |
if not tps_min: | |
tps_min = round(delay * 1000, 2) | |
except TypeError as e: | |
return print("\n Hôte injoignable {}".format(e, datetime.datetime.now().strftime("%x %X"))) | |
except socket.gaierror: | |
erreur += 1 | |
write_log("Ping log {} - {}".format(dest_addr, date.replace("/", "")), | |
"Erreur le {}\n".format(datetime.datetime.now().strftime("%x %X"))) | |
return print("\nErreur le {}".format(datetime.datetime.now().strftime("%x %X"))) | |
if delay is None: | |
invalide += 1 | |
write_log("Ping log {} - {}".format(dest_addr, date.replace("/", "")), | |
"Timeout le {}\n".format(datetime.datetime.now().strftime("%x %X"))) | |
print("\nTimeout le {}".format(datetime.datetime.now().strftime("%x %X"))) | |
else: | |
valide += 1 | |
delai = round(delay * 1000, 2) | |
if delai < tps_min: tps_min = delai | |
if delai > tps_max: tps_max = delai | |
try: | |
vals_moy[delai] += 1 | |
except KeyError: | |
vals_moy[delai] = 1 | |
tps_moy = round(sum(vals_moy.keys()) / sum(vals_moy.values()), 2) | |
total += 1 | |
sys.stdout.write("""\rTotal: {} OK: {} NOK: {} Err: {} Min: {}ms Moy: {}ms Max: {}ms Hr: {}""". | |
format(total, valide, invalide, erreur, tps_min, tps_moy, tps_max, round(total / 3600, 2))) | |
sys.stdout.flush() | |
time.sleep(1) | |
if __name__ == '__main__': | |
verbose_ping("127.1.0.0") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment