Skip to content

Instantly share code, notes, and snippets.

@Laurent-Andrieu
Last active September 23, 2022 11:40
Show Gist options
  • Save Laurent-Andrieu/413ad8ab3c1ff4f140d8228e32c4a26e to your computer and use it in GitHub Desktop.
Save Laurent-Andrieu/413ad8ab3c1ff4f140d8228e32c4a26e to your computer and use it in GitHub Desktop.
Ping avec affichage compteur dynamique
import datetime
import os, sys, socket, struct, select, time
default_timer = time.process_time
ICMP_ECHO_REQUEST = 8
total = 0
invalide = 0
valide = 0
erreur = 0
tps_max = 0
tps_min = 0
vals_moy = {}
tps_moy = 0
def checksum(source_string):
sum = 0
countTo = (len(source_string) / 2) * 2
count = 0
while count < countTo:
thisVal = source_string[count + 1] * 256 + source_string[count]
sum = sum + thisVal
sum = sum & 0xffffffff # Necessary?
count = count + 2
if countTo < len(source_string):
sum = sum + source_string[len(source_string) - 1]
sum = sum & 0xffffffff # Necessary?
sum = (sum >> 16) + (sum & 0xffff)
sum = sum + (sum >> 16)
answer = ~sum
answer = answer & 0xffff
# Swap bytes. Bugger me if I know why.
answer = answer >> 8 | (answer << 8 & 0xff00)
return answer
def receive_one_ping(my_socket, ID, timeout):
"""
receive the ping from the socket.
"""
timeLeft = timeout
while True:
startedSelect = default_timer()
whatReady = select.select([my_socket], [], [], timeLeft)
howLongInSelect = (default_timer() - startedSelect)
if not whatReady[0]: # Timeout
return
timeReceived = default_timer()
recPacket, addr = my_socket.recvfrom(1024)
icmpHeader = recPacket[20:28]
type, code, checksum, packetID, sequence = struct.unpack(
"bbHHh", icmpHeader
)
# Filters out the echo request itself.
# This can be tested by pinging 127.0.0.1
# You'll see your own request
if type != 8 and packetID == ID:
bytesInDouble = struct.calcsize("d")
timeSent = struct.unpack("d", recPacket[28:28 + bytesInDouble])[0]
return timeReceived - timeSent
timeLeft = timeLeft - howLongInSelect
if timeLeft <= 0:
return
def send_one_ping(my_socket, dest_addr, ID):
"""
Send one ping to the given >dest_addr<.
"""
dest_addr = socket.gethostbyname(dest_addr)
# Header is type (8), code (8), checksum (16), id (16), sequence (16)
my_checksum = 0
# Make a dummy header with a 0 checksum.
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, my_checksum, ID, 1)
bytesInDouble = struct.calcsize("d")
data = bytes((192 - bytesInDouble) * "Q", 'utf-8')
data = struct.pack("d", default_timer()) + data
# Calculate the checksum on the data and the dummy header.
my_checksum = checksum(header + data)
# Now that we have the right checksum, we put that in. It's just easier
# to make up a new header than to stuff it into the dummy.
header = struct.pack(
"bbHHh", ICMP_ECHO_REQUEST, 0, socket.htons(my_checksum), ID, 1
)
packet = header + data
my_socket.sendto(packet, (dest_addr, 1)) # Don't know about the 1
def do_one(dest_addr, timeout):
"""
Returns either the delay (in seconds) or none on timeout.
"""
icmp = socket.getprotobyname("icmp")
try:
my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
except PermissionError as e:
e.args = (e.args if e.args else tuple()) + ((
" - Note that ICMP messages can only be sent from processes"
" running as root."
),)
raise
my_ID = os.getpid() & 0xFFFF
send_one_ping(my_socket, dest_addr, my_ID)
delay = receive_one_ping(my_socket, my_ID, timeout)
my_socket.close()
return delay
def write_log(file, content):
try:
with open("{}.txt".format(file), "a") as f:
f.write(content)
f.close()
except IOError as e:
print(e)
def verbose_ping(dest_addr, timeout=3):
global invalide, valide, erreur, total, tps_min, tps_max, tps_moy
date1 = datetime.datetime.now().strftime("%x %X")
date = datetime.datetime.now().strftime("%x")
print("\rPing vers {} | Essai en cours depuis le {}. Timeout: {}s".
format(dest_addr, date1, timeout))
while True:
try:
delay = do_one(dest_addr, timeout)
if not tps_min:
tps_min = round(delay * 1000, 2)
except TypeError as e:
return print("\n Hôte injoignable {}".format(e, datetime.datetime.now().strftime("%x %X")))
except socket.gaierror:
erreur += 1
write_log("Ping log {} - {}".format(dest_addr, date.replace("/", "")),
"Erreur le {}\n".format(datetime.datetime.now().strftime("%x %X")))
return print("\nErreur le {}".format(datetime.datetime.now().strftime("%x %X")))
if delay is None:
invalide += 1
write_log("Ping log {} - {}".format(dest_addr, date.replace("/", "")),
"Timeout le {}\n".format(datetime.datetime.now().strftime("%x %X")))
print("\nTimeout le {}".format(datetime.datetime.now().strftime("%x %X")))
else:
valide += 1
delai = round(delay * 1000, 2)
if delai < tps_min: tps_min = delai
if delai > tps_max: tps_max = delai
try:
vals_moy[delai] += 1
except KeyError:
vals_moy[delai] = 1
tps_moy = round(sum(vals_moy.keys()) / sum(vals_moy.values()), 2)
total += 1
sys.stdout.write("""\rTotal: {} OK: {} NOK: {} Err: {} Min: {}ms Moy: {}ms Max: {}ms Hr: {}""".
format(total, valide, invalide, erreur, tps_min, tps_moy, tps_max, round(total / 3600, 2)))
sys.stdout.flush()
time.sleep(1)
if __name__ == '__main__':
verbose_ping("127.1.0.0")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment