Last active
July 18, 2022 23:36
-
-
Save haxwithaxe/2c2ade4b3bf5cc7dadd8fc7ff904618a to your computer and use it in GitHub Desktop.
A quick and dirty no install latency sensor.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
"""A no install latency monitoring tool.""" | |
import argparse | |
import enum | |
import json | |
import logging | |
import math | |
import os | |
import platform | |
import select | |
import socket | |
import struct | |
import sys | |
import threading | |
import time | |
import zlib | |
__license__ = 'MIT' | |
__author__ = 'haxwithaxe' | |
__credits__ = 'This script contains code from ping3 by Kai Yan <kai@kyan001.com> which is MIT licensed.' | |
log = logging.getLogger('latency-sensor') | |
log.setLevel(logging.ERROR) | |
ICMP_DEFAULT_CODE = 0 # the code for ECHO_REPLY and ECHO_REQUEST | |
IP_HEADER_FORMAT = "!BBHHHBBHII" | |
# According to netinet/ip_icmp.h. !=network byte order(big-endian), | |
# B=unsigned char, H=unsigned short | |
ICMP_HEADER_FORMAT = "!BBHHH" | |
ICMP_TIME_FORMAT = "!d" # d=double | |
SOCKET_SO_BINDTODEVICE = 25 #socket.SO_BINDTODEVICE | |
_NUMERIC_ERROR = 10**42 # Stand-in for infinity | |
class IcmpType(enum.IntEnum): | |
"""Enum for Type in ICMP Header.""" | |
ECHO_REPLY = 0 | |
DESTINATION_UNREACHABLE = 3 | |
REDIRECT_MESSAGE = 5 | |
ECHO_REQUEST = 8 | |
ROUTER_ADVERTISEMENT = 9 | |
ROUTER_SOLICITATION = 10 | |
TIME_EXCEEDED = 11 | |
BAD_IP_HEADER = 12 | |
TIMESTAMP = 13 | |
TIMESTAMP_REPLY = 14 | |
class IcmpDestinationUnreachableCode(enum.IntEnum): | |
"""Enum for Code in ICMP Header when type is DESTINATION_UNREACHABLE (3)""" | |
DESTINATION_NETWORK_UNREACHABLE = 0 | |
DESTINATION_HOST_UNREACHABLE = 1 | |
DESTINATION_PROTOCOL_UNREACHABLE = 2 | |
DESTINATION_PORT_UNREACHABLE = 3 | |
FRAGMENTATION_REQUIRED = 4 | |
SOURCE_ROUTE_FAILED = 5 | |
DESTINATION_NETWORK_UNKNOWN = 6 | |
DESTINATION_HOST_UNKNOWN = 7 | |
SOURCE_HOST_ISOLATED = 8 | |
NETWORK_ADMINISTRATIVELY_PROHIBITED = 9 | |
HOST_ADMINISTRATIVELY_PROHIBITED = 10 | |
NETWORK_UNREACHABLE_FOR_TOS = 11 | |
HOST_UNREACHABLE_FOR_TOS = 12 | |
COMMUNICATION_ADMINISTRATIVELY_PROHIBITED = 13 | |
HOST_PRECEDENCE_VIOLATION = 14 | |
PRECEDENCE_CUTOFF_IN_EFFECT = 15 | |
class IcmpTimeExceededCode(enum.IntEnum): | |
"""Enum for Code in ICMP Header when type is TIME_EXCEEDED (11)""" | |
TTL_EXPIRED = 0 | |
FRAGMENT_REASSEMBLY_TIME_EXCEEDED = 1 | |
class PingError(Exception): | |
pass | |
class TimeExceeded(PingError): | |
pass | |
class TimeToLiveExpired(TimeExceeded): | |
def __init__(self, message="Time exceeded: Time To Live expired.", | |
ip_header=None, icmp_header=None): | |
self.ip_header = ip_header | |
self.icmp_header = icmp_header | |
self.message = message | |
super().__init__(self.message) | |
class DestinationUnreachable(PingError): | |
def __init__(self, message="Destination unreachable.", ip_header=None, | |
icmp_header=None): | |
self.message = message | |
self.ip_header = ip_header or {} | |
self.icmp_header = icmp_header | |
self.src_addr = self.ip_header.get('src_addr') | |
if self.src_addr: | |
self.message = f"{message} (Host='{self.src_addr}')" | |
super().__init__(self.message) | |
class DestinationHostUnreachable(DestinationUnreachable): | |
def __init__(self, message="Destination unreachable: Host unreachable.", | |
ip_header=None, icmp_header=None): | |
super().__init__(message, ip_header, icmp_header) | |
class HostUnknown(PingError): | |
def __init__(self, message="Cannot resolve: Unknown host.", | |
dest_addr=None): | |
self.dest_addr = dest_addr | |
if self.dest_addr is None: | |
self.message = message | |
else: | |
self.message = message + " (Host='{}')".format(self.dest_addr) | |
super().__init__(self.message) | |
class Timeout(PingError): | |
def __init__(self, message="Request timeout for ICMP packet.", | |
timeout=None): | |
self.timeout = timeout | |
if self.timeout is None: | |
self.message = message | |
else: | |
self.message = message + " (Timeout={}s)".format(self.timeout) | |
super().__init__(self.message) | |
def checksum(source: bytes) -> int: | |
"""Calculates the checksum of the input bytes. | |
RFC1071: https://tools.ietf.org/html/rfc1071 | |
RFC792: https://tools.ietf.org/html/rfc792 | |
Args: | |
source: Bytes. The input to be calculated. | |
Returns: | |
int: Calculated checksum. | |
""" | |
bit_count = 16 # 16-bit long | |
carry = 1 << bit_count # 0x10000 | |
# Even bytes (odd indexes) shift 1 byte to the left. | |
result = sum(source[::2]) + (sum(source[1::2]) << (bit_count // 2)) | |
while result >= carry: # Ones' complement sum. | |
result = sum(divmod(result, carry)) # Each carry add to right most bit | |
return ~result & ((1 << bit_count) - 1) # Ensure 16-bit | |
def read_icmp_header(raw: bytes) -> dict: | |
"""Get information from raw ICMP header data. | |
Args: | |
raw: Bytes. Raw data of ICMP header. | |
Returns: | |
A map contains the infos from the raw header. | |
""" | |
icmp_header_keys = ('type', 'code', 'checksum', 'id', 'seq') | |
return dict(zip(icmp_header_keys, struct.unpack(ICMP_HEADER_FORMAT, raw))) | |
def read_ip_header(raw: bytes) -> dict: | |
"""Get information from raw IP header data. | |
Args: | |
raw: Bytes. Raw data of IP header. | |
Returns: | |
A map contains the infos from the raw header. | |
""" | |
def stringify_ip(addr: int) -> str: | |
# str(ipaddress.ip_address(ip)) | |
return ".".join(str(addr >> offset & 0xff) for offset in (24, 16, 8, 0)) | |
ip_header_keys = ('version', 'tos', 'len', 'id', 'flags', 'ttl', | |
'protocol', 'checksum', 'src_addr', 'dest_addr') | |
ip_header = dict(zip(ip_header_keys, struct.unpack(IP_HEADER_FORMAT, raw))) | |
ip_header['src_addr'] = stringify_ip(ip_header['src_addr']) | |
ip_header['dest_addr'] = stringify_ip(ip_header['dest_addr']) | |
return ip_header | |
def send_one_ping(sock: socket, dest_addr: str, icmp_id: int, seq: int, | |
size: int): | |
"""Sends one ping to the given destination. | |
ICMP Header (bits): type (8), code (8), checksum (16), id (16), | |
sequence (16) | |
ICMP Payload: time (double), data | |
ICMP Wikipedia: | |
https://en.wikipedia.org/wiki/Internet_Control_Message_Protocol | |
Args: | |
sock: Socket. | |
dest_addr: The destination address, can be an IP address or a domain | |
name. Ex. "192.168.1.1"/"example.com" | |
icmp_id: ICMP packet id. Calculated from Process ID and Thread ID. | |
seq: ICMP packet sequence, usually increases from 0 in the same process | |
size: The ICMP packet payload size in bytes. Note this is only for the | |
payload part. | |
Raises: | |
HostUnkown: If destination address is a domain name and cannot | |
resolved. | |
""" | |
log.debug("Destination address: '%s'", dest_addr) | |
try: | |
# Domain name will translated into IP address, and IP address leaves | |
# unchanged. | |
dest_addr = socket.gethostbyname(dest_addr) | |
except socket.gaierror as err: | |
raise HostUnknown(dest_addr=dest_addr) from err | |
log.debug("Destination IP address: %s", dest_addr) | |
# Pseudo checksum is used to calculate the real checksum. | |
pseudo_checksum = 0 | |
icmp_header = struct.pack(ICMP_HEADER_FORMAT, IcmpType.ECHO_REQUEST, | |
ICMP_DEFAULT_CODE, pseudo_checksum, icmp_id, seq) | |
# Using double to store current time. | |
padding = (size - struct.calcsize(ICMP_TIME_FORMAT)) * "Q" | |
icmp_payload = struct.pack(ICMP_TIME_FORMAT, time.time()) \ | |
+ padding.encode() | |
# Calculates the checksum on the dummy header and the icmp_payload. | |
real_checksum = checksum(icmp_header + icmp_payload) | |
# Don't know why I need socket.htons() on real_checksum since | |
# ICMP_HEADER_FORMAT already in Network Bytes Order (big-endian) | |
# Put real checksum into ICMP header. | |
icmp_header = struct.pack(ICMP_HEADER_FORMAT, IcmpType.ECHO_REQUEST, | |
ICMP_DEFAULT_CODE, socket.htons(real_checksum), | |
icmp_id, seq) | |
log.debug("Sent ICMP header: %s", read_icmp_header(icmp_header)) | |
log.debug("Sent ICMP payload: %s", icmp_payload) | |
packet = icmp_header + icmp_payload | |
# addr = (ip, port). Port is 0 respectively the OS default behavior will be | |
# used. | |
sock.sendto(packet, (dest_addr, 0)) | |
def receive_one_ping(sock: socket, icmp_id: int, seq: int, | |
timeout: int) -> float: | |
"""Receives the ping from the socket. | |
IP Header (bits): version (8), type of service (8), length (16), id (16), | |
flags (16), time to live (8), protocol (8), checksum (16), | |
source ip (32), destination ip (32). | |
ICMP Packet (bytes): IP Header (20), ICMP Header (8), ICMP Payload (*). | |
Ping Wikipedia: https://en.wikipedia.org/wiki/Ping_(networking_utility) | |
ToS (Type of Service) in IP header for ICMP is 0. Protocol in IP header for | |
ICMP is 1. | |
Args: | |
sock: The same socket used for send the ping. | |
icmp_id: ICMP packet id. Sent packet id should be identical with | |
received packet id. | |
seq: ICMP packet sequence. Sent packet sequence should be identical | |
with received packet sequence. | |
timeout: Timeout in seconds. | |
Returns: | |
The delay in seconds or None on timeout. | |
Raises: | |
TimeToLiveExpired: If the Time-To-Live in IP Header is not large enough | |
for destination. | |
TimeExceeded: If time exceeded but Time-To-Live does not expired. | |
DestinationHostUnreachable: If the destination host is unreachable. | |
DestinationUnreachable: If the destination is unreachable. | |
""" | |
# No IP Header when unprivileged on Linux. | |
has_ip_header = (os.name != 'posix') or \ | |
(platform.system() == 'Darwin') or (sock.type == socket.SOCK_RAW) | |
if has_ip_header: | |
ip_header_slice = slice(0, struct.calcsize(IP_HEADER_FORMAT)) # [0:20] | |
# [20:28] | |
icmp_header_slice = slice( | |
ip_header_slice.stop, | |
ip_header_slice.stop + struct.calcsize(ICMP_HEADER_FORMAT) | |
) | |
else: | |
log.debug("Unprivileged on Linux") | |
# [0:8] | |
icmp_header_slice = slice(0, struct.calcsize(ICMP_HEADER_FORMAT)) | |
timeout_time = time.time() + timeout # Exactly time when timeout. | |
log.debug("Timeout time: %s (%s)", time.ctime(timeout_time), timeout_time) | |
while True: | |
# How many seconds left until timeout. | |
timeout_left = timeout_time - time.time() | |
# Timeout must be non-negative | |
timeout_left = timeout_left if timeout_left > 0 else 0 | |
log.debug("Timeout left: {:.2f}s".format(timeout_left)) | |
# Wait until sock is ready to read or time is out. | |
selected = select.select([sock, ], [], [], timeout_left) | |
if selected[0] == []: # Timeout | |
raise Timeout(timeout=timeout) | |
time_recv = time.time() | |
log.debug("Received time: %s (%s))", time.ctime(time_recv), time_recv) | |
# Single packet size limit is 65535 bytes, but usually the network | |
# packet limit is 1500 bytes. | |
recv_data, _ = sock.recvfrom(1500) | |
if has_ip_header: | |
ip_header_raw = recv_data[ip_header_slice] | |
ip_header = read_ip_header(ip_header_raw) | |
log.debug("Received IP header: %s", ip_header) | |
else: | |
ip_header = None | |
icmp_header_raw = recv_data[icmp_header_slice] | |
icmp_payload_raw = recv_data[icmp_header_slice.stop:] | |
icmp_header = read_icmp_header(icmp_header_raw) | |
log.debug("Received ICMP header: %s", icmp_header) | |
log.debug("Received ICMP payload: %s", icmp_payload_raw) | |
# When unprivileged on Linux, ICMP ID is rewrited by kernel. | |
if not has_ip_header: | |
# According to https://stackoverflow.com/a/14023878/4528364 | |
icmp_id = sock.getsockname()[1] | |
# TIME_EXCEEDED has no icmp_id and icmp_seq. Usually they are 0. | |
if icmp_header['type'] == IcmpType.TIME_EXCEEDED: | |
# Windows raw socket cannot get TTL_EXPIRED. See | |
# https://stackoverflow.com/questions/43239862/ | |
# socket-sock-raw-ipproto-icmp-cant-read-ttl-response. | |
if icmp_header['code'] == IcmpTimeExceededCode.TTL_EXPIRED: | |
# Some router does not report TTL expired and then timeout shows. | |
raise TimeToLiveExpired(ip_header=ip_header, | |
icmp_header=icmp_header) | |
raise TimeExceeded() | |
# DESTINATION_UNREACHABLE has no icmp_id and icmp_seq. Usually they are 0. | |
if icmp_header['type'] == IcmpType.DESTINATION_UNREACHABLE: | |
if icmp_header['code'] == \ | |
IcmpDestinationUnreachableCode.DESTINATION_HOST_UNREACHABLE: | |
raise DestinationHostUnreachable(ip_header=ip_header, | |
icmp_header=icmp_header) | |
raise DestinationUnreachable(ip_header=ip_header, | |
icmp_header=icmp_header) | |
if icmp_header['id']: | |
# filters out the ECHO_REQUEST itself. | |
if icmp_header['type'] == IcmpType.ECHO_REQUEST: | |
log.debug("ECHO_REQUEST received. Packet filtered out.") | |
continue | |
# ECHO_REPLY should match the ICMP ID field. | |
if icmp_header['id'] != icmp_id: | |
log.debug("ICMP ID dismatch. Packet filtered out.") | |
continue | |
# ECHO_REPLY should match the ICMP SEQ field. | |
if icmp_header['seq'] != seq: | |
log.debug("IMCP SEQ dismatch. Packet filtered out.") | |
continue | |
if icmp_header['type'] == IcmpType.ECHO_REPLY: | |
time_sent = struct.unpack(ICMP_TIME_FORMAT, | |
icmp_payload_raw[0:struct.calcsize( | |
ICMP_TIME_FORMAT | |
)])[0] | |
log.debug("Received sent time: %s (%s)", time.ctime(time_sent), | |
time_sent) | |
return time_recv - time_sent | |
log.debug("Uncatched ICMP packet: %s", icmp_header) | |
def ping(dest_addr: str, timeout: int = 4, unit: str = "s", | |
src_addr: str = None, ttl: int = None, seq: int = 0, size: int = 56, | |
interface: str = None) -> float: | |
""" | |
Send one ping to destination address with the given timeout. | |
Args: | |
dest_addr: The destination address, can be an IP address or a domain | |
name. Ex. "192.168.1.1"/"example.com" | |
timeout: Time to wait for a response, in seconds. Default is 4s, same | |
as Windows CMD. (default 4) | |
unit: The unit of returned value. "s" for seconds, "ms" for | |
milliseconds. (default "s") | |
src_addr: The IP address to ping from. This is for multiple network | |
interfaces. Ex. "192.168.1.20". (default None) | |
interface: LINUX ONLY. The gateway network interface to ping from. | |
Ex. "wlan0". (default None) | |
ttl: The Time-To-Live of the outgoing packet. Default is None, which | |
means using OS default ttl -- 64 onLinux and macOS, and 128 on | |
Windows. (default None) | |
seq: ICMP packet sequence, usually increases from 0 in the same | |
process. (default 0) | |
size: The ICMP packet payload size in bytes. If the input of this is | |
less than the bytes of a double format (usually 8), the size of | |
ICMP packet payload is 8 bytes to hold a time. The max should be | |
the router_MTU(Usually 1480) - IP_Header(20) - ICMP_Header(8). | |
Default is 56, same as in macOS. (default 56) | |
Returns: | |
The delay in seconds/milliseconds, False on error and None on timeout. | |
Raises: | |
PingError: Any PingError will raise again if `ping3.EXCEPTIONS` is | |
True. | |
""" | |
sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, | |
socket.IPPROTO_ICMP) | |
with sock: | |
if ttl: | |
try: # IPPROTO_IP is for Windows and BSD Linux. | |
if sock.getsockopt(socket.IPPROTO_IP, socket.IP_TTL): | |
sock.setsockopt(socket.IPPROTO_IP, socket.IP_TTL, ttl) | |
except OSError as err: | |
log.debug( | |
"Set Socket Option `IP_TTL` in `IPPROTO_IP` Failed: %s", | |
err | |
) | |
try: | |
if sock.getsockopt(socket.SOL_IP, socket.IP_TTL): | |
sock.setsockopt(socket.SOL_IP, socket.IP_TTL, ttl) | |
except OSError as err: | |
log.debug( | |
"Set Socket Option `IP_TTL` in `SOL_IP` Failed: %s", | |
err | |
) | |
if interface: | |
# packets will be sent from specified interface. | |
sock.setsockopt(socket.SOL_SOCKET, SOCKET_SO_BINDTODEVICE, | |
f'{interface}\0'.encode()) | |
log.debug("Socket Interface Binded: %s", interface) | |
if src_addr: | |
# only packets send to src_addr are received. | |
sock.bind((src_addr, 0)) | |
log.debug("Socket Source Address Binded: %s", src_addr) | |
if hasattr(threading, 'get_native_id'): | |
# threading.get_native_id() is supported >= python3.8. | |
thread_id = threading.get_native_id() | |
else: | |
thread_id = threading.currentThread().ident | |
# If ping() run under different process, thread_id may be identical. | |
process_id = os.getpid() | |
# to avoid icmp_id collision. | |
icmp_id = zlib.crc32( | |
"{}{}".format(process_id, thread_id).encode() | |
) & 0xffff | |
send_one_ping(sock=sock, dest_addr=dest_addr, icmp_id=icmp_id, | |
seq=seq, size=size) | |
# in seconds | |
delay = receive_one_ping(sock=sock, icmp_id=icmp_id, seq=seq, | |
timeout=timeout) | |
if delay is None: | |
return None | |
if unit == "ms": | |
delay *= 1000 # in milliseconds | |
return delay | |
def median(delays): | |
"""Return the median value of a given list of numbers.""" | |
delays.sort() | |
middle_index = math.floor(len(delays)/2) | |
if len(delays) % 2 == 0: | |
if _NUMERIC_ERROR in (delays[middle_index], delays[middle_index+1]): | |
return None | |
return sum(delays[middle_index-1:middle_index+1])/2 | |
if delays[middle_index] == _NUMERIC_ERROR: | |
return None | |
return delays[middle_index] | |
def _errors_as_numbers(records): | |
output = [] | |
for record in records: | |
if isinstance(record, Exception): | |
record = _NUMERIC_ERROR | |
output.append(record) | |
return output | |
def _error_report(errors): | |
dest_unreachable=list(filter( | |
lambda x: isinstance(x, DestinationUnreachable), | |
errors | |
)) | |
dest_unreachable_from=[x.src_addr for x in dest_unreachable if x.src_addr] | |
ttl_exceeded=[x for x in errors if isinstance(x, TimeToLiveExpired)] | |
return dict( | |
count=len(errors), | |
dest_unreachable_from=dest_unreachable_from, | |
dest_unreachable_count=len(dest_unreachable), | |
timeout_count=len([x for x in errors if isinstance(x, Timeout)]), | |
ttl_exceeded_count=len(ttl_exceeded) | |
) | |
def _report(records, start_time, log_interval, conf): | |
log.debug('Evaluating records over %s seconds', log_interval) | |
if int(time.time() - start_time) % log_interval or \ | |
len(records) < int(log_interval/conf.interval): | |
return | |
interval_slice = 0-int(log_interval/conf.interval) | |
records = records[interval_slice:] | |
delays = [x for x in records if isinstance(x, (int, float))] | |
errors = [x for x in records if isinstance(x, PingError)] | |
content = dict( | |
interval=f'{log_interval}s', | |
timestamp=time.asctime(), | |
min=min(delays), | |
max=max(delays), | |
average=sum(delays)/len(delays), | |
median=median(_errors_as_numbers(records)) or 'is error', | |
errors=_error_report(errors) | |
) | |
if conf.stdout: | |
print(content) | |
return | |
report_filename = os.path.join(conf.log_dir, f'{conf.target}.log') | |
log.debug('Writing log to %s', report_filename) | |
if os.path.exists(report_filename): | |
report_log = json.load(open(report_filename, 'r')) | |
else: | |
report_log = [] | |
report_log.append(content) | |
json.dump(report_log, open(report_filename, 'w'), indent=4, sort_keys=True) | |
def _main(): | |
parser = argparse.ArgumentParser( | |
description='Pings a remote host and logs statistics about latency. ' | |
'Avoid abusing services like 8.8.8.8 which respond to ICMP ' | |
'pings but are not network performance testing services.' | |
) | |
parser.add_argument('-t', '--target', required=True, | |
help='The remote host to ping.') | |
parser.add_argument('-i', '--interval', type=int, default=3, | |
help='Ping interval in seconds.') | |
parser.add_argument('-l', '--log-dir', default='/var/log/latency-sensor') | |
parser.add_argument( | |
'--log-intervals', | |
type=int, | |
nargs='+', | |
default=[60, 300, 900], | |
help='The chunks of time (in seconds) to evaluate for log statistics. ' | |
'The default is to evaluate every 1, 5, and 15 minutes (60, 300, ' | |
'and 900 seconds).' | |
) | |
parser.add_argument( | |
'--stdout', | |
action='store_true', | |
default=False, | |
help='Print reports to stdout instead of writing them to disk.' | |
) | |
parser.add_argument('--debug', action='store_true', default=False) | |
args = parser.parse_args() | |
if args.debug: | |
log.setLevel(logging.DEBUG) | |
print('Target host:', args.target) | |
print('Ping interval:', args.interval) | |
print('Log directory:', args.log_dir) | |
print('Log intervals:', args.log_intervals) | |
start_time = time.time() | |
records = [] | |
if not args.stdout: | |
os.makedirs(args.log_dir, exist_ok=True) | |
try: | |
while True: | |
try: | |
delay = ping(args.target) | |
records.append(delay) | |
log.debug('Pinged %s: %s', args.target, delay) | |
except PingError as err: | |
log.error(err) | |
records.append(err) | |
for log_interval in args.log_intervals: | |
_report(records, start_time, log_interval, args) | |
# Prune old records | |
while len(records) > int(max(args.log_intervals)/args.interval): | |
records.pop(0) | |
time.sleep(args.interval) | |
except KeyboardInterrupt: | |
print(records) | |
if __name__ == '__main__': | |
_main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment