Skip to content

Instantly share code, notes, and snippets.

@haxwithaxe
Last active July 18, 2022 23:36
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save haxwithaxe/2c2ade4b3bf5cc7dadd8fc7ff904618a to your computer and use it in GitHub Desktop.
Save haxwithaxe/2c2ade4b3bf5cc7dadd8fc7ff904618a to your computer and use it in GitHub Desktop.
A quick and dirty no install latency sensor.
#!/usr/bin/env python3
"""A no install latency monitoring tool."""
import argparse
import enum
import json
import logging
import math
import os
import platform
import select
import socket
import struct
import sys
import threading
import time
import zlib
__license__ = 'MIT'
__author__ = 'haxwithaxe'
__credits__ = 'This script contains code from ping3 by Kai Yan <kai@kyan001.com> which is MIT licensed.'
log = logging.getLogger('latency-sensor')
log.setLevel(logging.ERROR)
ICMP_DEFAULT_CODE = 0 # the code for ECHO_REPLY and ECHO_REQUEST
IP_HEADER_FORMAT = "!BBHHHBBHII"
# According to netinet/ip_icmp.h. !=network byte order(big-endian),
# B=unsigned char, H=unsigned short
ICMP_HEADER_FORMAT = "!BBHHH"
ICMP_TIME_FORMAT = "!d" # d=double
SOCKET_SO_BINDTODEVICE = 25 #socket.SO_BINDTODEVICE
_NUMERIC_ERROR = 10**42 # Stand-in for infinity
class IcmpType(enum.IntEnum):
"""Enum for Type in ICMP Header."""
ECHO_REPLY = 0
DESTINATION_UNREACHABLE = 3
REDIRECT_MESSAGE = 5
ECHO_REQUEST = 8
ROUTER_ADVERTISEMENT = 9
ROUTER_SOLICITATION = 10
TIME_EXCEEDED = 11
BAD_IP_HEADER = 12
TIMESTAMP = 13
TIMESTAMP_REPLY = 14
class IcmpDestinationUnreachableCode(enum.IntEnum):
"""Enum for Code in ICMP Header when type is DESTINATION_UNREACHABLE (3)"""
DESTINATION_NETWORK_UNREACHABLE = 0
DESTINATION_HOST_UNREACHABLE = 1
DESTINATION_PROTOCOL_UNREACHABLE = 2
DESTINATION_PORT_UNREACHABLE = 3
FRAGMENTATION_REQUIRED = 4
SOURCE_ROUTE_FAILED = 5
DESTINATION_NETWORK_UNKNOWN = 6
DESTINATION_HOST_UNKNOWN = 7
SOURCE_HOST_ISOLATED = 8
NETWORK_ADMINISTRATIVELY_PROHIBITED = 9
HOST_ADMINISTRATIVELY_PROHIBITED = 10
NETWORK_UNREACHABLE_FOR_TOS = 11
HOST_UNREACHABLE_FOR_TOS = 12
COMMUNICATION_ADMINISTRATIVELY_PROHIBITED = 13
HOST_PRECEDENCE_VIOLATION = 14
PRECEDENCE_CUTOFF_IN_EFFECT = 15
class IcmpTimeExceededCode(enum.IntEnum):
"""Enum for Code in ICMP Header when type is TIME_EXCEEDED (11)"""
TTL_EXPIRED = 0
FRAGMENT_REASSEMBLY_TIME_EXCEEDED = 1
class PingError(Exception):
pass
class TimeExceeded(PingError):
pass
class TimeToLiveExpired(TimeExceeded):
def __init__(self, message="Time exceeded: Time To Live expired.",
ip_header=None, icmp_header=None):
self.ip_header = ip_header
self.icmp_header = icmp_header
self.message = message
super().__init__(self.message)
class DestinationUnreachable(PingError):
def __init__(self, message="Destination unreachable.", ip_header=None,
icmp_header=None):
self.message = message
self.ip_header = ip_header or {}
self.icmp_header = icmp_header
self.src_addr = self.ip_header.get('src_addr')
if self.src_addr:
self.message = f"{message} (Host='{self.src_addr}')"
super().__init__(self.message)
class DestinationHostUnreachable(DestinationUnreachable):
def __init__(self, message="Destination unreachable: Host unreachable.",
ip_header=None, icmp_header=None):
super().__init__(message, ip_header, icmp_header)
class HostUnknown(PingError):
def __init__(self, message="Cannot resolve: Unknown host.",
dest_addr=None):
self.dest_addr = dest_addr
if self.dest_addr is None:
self.message = message
else:
self.message = message + " (Host='{}')".format(self.dest_addr)
super().__init__(self.message)
class Timeout(PingError):
def __init__(self, message="Request timeout for ICMP packet.",
timeout=None):
self.timeout = timeout
if self.timeout is None:
self.message = message
else:
self.message = message + " (Timeout={}s)".format(self.timeout)
super().__init__(self.message)
def checksum(source: bytes) -> int:
"""Calculates the checksum of the input bytes.
RFC1071: https://tools.ietf.org/html/rfc1071
RFC792: https://tools.ietf.org/html/rfc792
Args:
source: Bytes. The input to be calculated.
Returns:
int: Calculated checksum.
"""
bit_count = 16 # 16-bit long
carry = 1 << bit_count # 0x10000
# Even bytes (odd indexes) shift 1 byte to the left.
result = sum(source[::2]) + (sum(source[1::2]) << (bit_count // 2))
while result >= carry: # Ones' complement sum.
result = sum(divmod(result, carry)) # Each carry add to right most bit
return ~result & ((1 << bit_count) - 1) # Ensure 16-bit
def read_icmp_header(raw: bytes) -> dict:
"""Get information from raw ICMP header data.
Args:
raw: Bytes. Raw data of ICMP header.
Returns:
A map contains the infos from the raw header.
"""
icmp_header_keys = ('type', 'code', 'checksum', 'id', 'seq')
return dict(zip(icmp_header_keys, struct.unpack(ICMP_HEADER_FORMAT, raw)))
def read_ip_header(raw: bytes) -> dict:
"""Get information from raw IP header data.
Args:
raw: Bytes. Raw data of IP header.
Returns:
A map contains the infos from the raw header.
"""
def stringify_ip(addr: int) -> str:
# str(ipaddress.ip_address(ip))
return ".".join(str(addr >> offset & 0xff) for offset in (24, 16, 8, 0))
ip_header_keys = ('version', 'tos', 'len', 'id', 'flags', 'ttl',
'protocol', 'checksum', 'src_addr', 'dest_addr')
ip_header = dict(zip(ip_header_keys, struct.unpack(IP_HEADER_FORMAT, raw)))
ip_header['src_addr'] = stringify_ip(ip_header['src_addr'])
ip_header['dest_addr'] = stringify_ip(ip_header['dest_addr'])
return ip_header
def send_one_ping(sock: socket, dest_addr: str, icmp_id: int, seq: int,
size: int):
"""Sends one ping to the given destination.
ICMP Header (bits): type (8), code (8), checksum (16), id (16),
sequence (16)
ICMP Payload: time (double), data
ICMP Wikipedia:
https://en.wikipedia.org/wiki/Internet_Control_Message_Protocol
Args:
sock: Socket.
dest_addr: The destination address, can be an IP address or a domain
name. Ex. "192.168.1.1"/"example.com"
icmp_id: ICMP packet id. Calculated from Process ID and Thread ID.
seq: ICMP packet sequence, usually increases from 0 in the same process
size: The ICMP packet payload size in bytes. Note this is only for the
payload part.
Raises:
HostUnkown: If destination address is a domain name and cannot
resolved.
"""
log.debug("Destination address: '%s'", dest_addr)
try:
# Domain name will translated into IP address, and IP address leaves
# unchanged.
dest_addr = socket.gethostbyname(dest_addr)
except socket.gaierror as err:
raise HostUnknown(dest_addr=dest_addr) from err
log.debug("Destination IP address: %s", dest_addr)
# Pseudo checksum is used to calculate the real checksum.
pseudo_checksum = 0
icmp_header = struct.pack(ICMP_HEADER_FORMAT, IcmpType.ECHO_REQUEST,
ICMP_DEFAULT_CODE, pseudo_checksum, icmp_id, seq)
# Using double to store current time.
padding = (size - struct.calcsize(ICMP_TIME_FORMAT)) * "Q"
icmp_payload = struct.pack(ICMP_TIME_FORMAT, time.time()) \
+ padding.encode()
# Calculates the checksum on the dummy header and the icmp_payload.
real_checksum = checksum(icmp_header + icmp_payload)
# Don't know why I need socket.htons() on real_checksum since
# ICMP_HEADER_FORMAT already in Network Bytes Order (big-endian)
# Put real checksum into ICMP header.
icmp_header = struct.pack(ICMP_HEADER_FORMAT, IcmpType.ECHO_REQUEST,
ICMP_DEFAULT_CODE, socket.htons(real_checksum),
icmp_id, seq)
log.debug("Sent ICMP header: %s", read_icmp_header(icmp_header))
log.debug("Sent ICMP payload: %s", icmp_payload)
packet = icmp_header + icmp_payload
# addr = (ip, port). Port is 0 respectively the OS default behavior will be
# used.
sock.sendto(packet, (dest_addr, 0))
def receive_one_ping(sock: socket, icmp_id: int, seq: int,
timeout: int) -> float:
"""Receives the ping from the socket.
IP Header (bits): version (8), type of service (8), length (16), id (16),
flags (16), time to live (8), protocol (8), checksum (16),
source ip (32), destination ip (32).
ICMP Packet (bytes): IP Header (20), ICMP Header (8), ICMP Payload (*).
Ping Wikipedia: https://en.wikipedia.org/wiki/Ping_(networking_utility)
ToS (Type of Service) in IP header for ICMP is 0. Protocol in IP header for
ICMP is 1.
Args:
sock: The same socket used for send the ping.
icmp_id: ICMP packet id. Sent packet id should be identical with
received packet id.
seq: ICMP packet sequence. Sent packet sequence should be identical
with received packet sequence.
timeout: Timeout in seconds.
Returns:
The delay in seconds or None on timeout.
Raises:
TimeToLiveExpired: If the Time-To-Live in IP Header is not large enough
for destination.
TimeExceeded: If time exceeded but Time-To-Live does not expired.
DestinationHostUnreachable: If the destination host is unreachable.
DestinationUnreachable: If the destination is unreachable.
"""
# No IP Header when unprivileged on Linux.
has_ip_header = (os.name != 'posix') or \
(platform.system() == 'Darwin') or (sock.type == socket.SOCK_RAW)
if has_ip_header:
ip_header_slice = slice(0, struct.calcsize(IP_HEADER_FORMAT)) # [0:20]
# [20:28]
icmp_header_slice = slice(
ip_header_slice.stop,
ip_header_slice.stop + struct.calcsize(ICMP_HEADER_FORMAT)
)
else:
log.debug("Unprivileged on Linux")
# [0:8]
icmp_header_slice = slice(0, struct.calcsize(ICMP_HEADER_FORMAT))
timeout_time = time.time() + timeout # Exactly time when timeout.
log.debug("Timeout time: %s (%s)", time.ctime(timeout_time), timeout_time)
while True:
# How many seconds left until timeout.
timeout_left = timeout_time - time.time()
# Timeout must be non-negative
timeout_left = timeout_left if timeout_left > 0 else 0
log.debug("Timeout left: {:.2f}s".format(timeout_left))
# Wait until sock is ready to read or time is out.
selected = select.select([sock, ], [], [], timeout_left)
if selected[0] == []: # Timeout
raise Timeout(timeout=timeout)
time_recv = time.time()
log.debug("Received time: %s (%s))", time.ctime(time_recv), time_recv)
# Single packet size limit is 65535 bytes, but usually the network
# packet limit is 1500 bytes.
recv_data, _ = sock.recvfrom(1500)
if has_ip_header:
ip_header_raw = recv_data[ip_header_slice]
ip_header = read_ip_header(ip_header_raw)
log.debug("Received IP header: %s", ip_header)
else:
ip_header = None
icmp_header_raw = recv_data[icmp_header_slice]
icmp_payload_raw = recv_data[icmp_header_slice.stop:]
icmp_header = read_icmp_header(icmp_header_raw)
log.debug("Received ICMP header: %s", icmp_header)
log.debug("Received ICMP payload: %s", icmp_payload_raw)
# When unprivileged on Linux, ICMP ID is rewrited by kernel.
if not has_ip_header:
# According to https://stackoverflow.com/a/14023878/4528364
icmp_id = sock.getsockname()[1]
# TIME_EXCEEDED has no icmp_id and icmp_seq. Usually they are 0.
if icmp_header['type'] == IcmpType.TIME_EXCEEDED:
# Windows raw socket cannot get TTL_EXPIRED. See
# https://stackoverflow.com/questions/43239862/
# socket-sock-raw-ipproto-icmp-cant-read-ttl-response.
if icmp_header['code'] == IcmpTimeExceededCode.TTL_EXPIRED:
# Some router does not report TTL expired and then timeout shows.
raise TimeToLiveExpired(ip_header=ip_header,
icmp_header=icmp_header)
raise TimeExceeded()
# DESTINATION_UNREACHABLE has no icmp_id and icmp_seq. Usually they are 0.
if icmp_header['type'] == IcmpType.DESTINATION_UNREACHABLE:
if icmp_header['code'] == \
IcmpDestinationUnreachableCode.DESTINATION_HOST_UNREACHABLE:
raise DestinationHostUnreachable(ip_header=ip_header,
icmp_header=icmp_header)
raise DestinationUnreachable(ip_header=ip_header,
icmp_header=icmp_header)
if icmp_header['id']:
# filters out the ECHO_REQUEST itself.
if icmp_header['type'] == IcmpType.ECHO_REQUEST:
log.debug("ECHO_REQUEST received. Packet filtered out.")
continue
# ECHO_REPLY should match the ICMP ID field.
if icmp_header['id'] != icmp_id:
log.debug("ICMP ID dismatch. Packet filtered out.")
continue
# ECHO_REPLY should match the ICMP SEQ field.
if icmp_header['seq'] != seq:
log.debug("IMCP SEQ dismatch. Packet filtered out.")
continue
if icmp_header['type'] == IcmpType.ECHO_REPLY:
time_sent = struct.unpack(ICMP_TIME_FORMAT,
icmp_payload_raw[0:struct.calcsize(
ICMP_TIME_FORMAT
)])[0]
log.debug("Received sent time: %s (%s)", time.ctime(time_sent),
time_sent)
return time_recv - time_sent
log.debug("Uncatched ICMP packet: %s", icmp_header)
def ping(dest_addr: str, timeout: int = 4, unit: str = "s",
src_addr: str = None, ttl: int = None, seq: int = 0, size: int = 56,
interface: str = None) -> float:
"""
Send one ping to destination address with the given timeout.
Args:
dest_addr: The destination address, can be an IP address or a domain
name. Ex. "192.168.1.1"/"example.com"
timeout: Time to wait for a response, in seconds. Default is 4s, same
as Windows CMD. (default 4)
unit: The unit of returned value. "s" for seconds, "ms" for
milliseconds. (default "s")
src_addr: The IP address to ping from. This is for multiple network
interfaces. Ex. "192.168.1.20". (default None)
interface: LINUX ONLY. The gateway network interface to ping from.
Ex. "wlan0". (default None)
ttl: The Time-To-Live of the outgoing packet. Default is None, which
means using OS default ttl -- 64 onLinux and macOS, and 128 on
Windows. (default None)
seq: ICMP packet sequence, usually increases from 0 in the same
process. (default 0)
size: The ICMP packet payload size in bytes. If the input of this is
less than the bytes of a double format (usually 8), the size of
ICMP packet payload is 8 bytes to hold a time. The max should be
the router_MTU(Usually 1480) - IP_Header(20) - ICMP_Header(8).
Default is 56, same as in macOS. (default 56)
Returns:
The delay in seconds/milliseconds, False on error and None on timeout.
Raises:
PingError: Any PingError will raise again if `ping3.EXCEPTIONS` is
True.
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_RAW,
socket.IPPROTO_ICMP)
with sock:
if ttl:
try: # IPPROTO_IP is for Windows and BSD Linux.
if sock.getsockopt(socket.IPPROTO_IP, socket.IP_TTL):
sock.setsockopt(socket.IPPROTO_IP, socket.IP_TTL, ttl)
except OSError as err:
log.debug(
"Set Socket Option `IP_TTL` in `IPPROTO_IP` Failed: %s",
err
)
try:
if sock.getsockopt(socket.SOL_IP, socket.IP_TTL):
sock.setsockopt(socket.SOL_IP, socket.IP_TTL, ttl)
except OSError as err:
log.debug(
"Set Socket Option `IP_TTL` in `SOL_IP` Failed: %s",
err
)
if interface:
# packets will be sent from specified interface.
sock.setsockopt(socket.SOL_SOCKET, SOCKET_SO_BINDTODEVICE,
f'{interface}\0'.encode())
log.debug("Socket Interface Binded: %s", interface)
if src_addr:
# only packets send to src_addr are received.
sock.bind((src_addr, 0))
log.debug("Socket Source Address Binded: %s", src_addr)
if hasattr(threading, 'get_native_id'):
# threading.get_native_id() is supported >= python3.8.
thread_id = threading.get_native_id()
else:
thread_id = threading.currentThread().ident
# If ping() run under different process, thread_id may be identical.
process_id = os.getpid()
# to avoid icmp_id collision.
icmp_id = zlib.crc32(
"{}{}".format(process_id, thread_id).encode()
) & 0xffff
send_one_ping(sock=sock, dest_addr=dest_addr, icmp_id=icmp_id,
seq=seq, size=size)
# in seconds
delay = receive_one_ping(sock=sock, icmp_id=icmp_id, seq=seq,
timeout=timeout)
if delay is None:
return None
if unit == "ms":
delay *= 1000 # in milliseconds
return delay
def median(delays):
"""Return the median value of a given list of numbers."""
delays.sort()
middle_index = math.floor(len(delays)/2)
if len(delays) % 2 == 0:
if _NUMERIC_ERROR in (delays[middle_index], delays[middle_index+1]):
return None
return sum(delays[middle_index-1:middle_index+1])/2
if delays[middle_index] == _NUMERIC_ERROR:
return None
return delays[middle_index]
def _errors_as_numbers(records):
output = []
for record in records:
if isinstance(record, Exception):
record = _NUMERIC_ERROR
output.append(record)
return output
def _error_report(errors):
dest_unreachable=list(filter(
lambda x: isinstance(x, DestinationUnreachable),
errors
))
dest_unreachable_from=[x.src_addr for x in dest_unreachable if x.src_addr]
ttl_exceeded=[x for x in errors if isinstance(x, TimeToLiveExpired)]
return dict(
count=len(errors),
dest_unreachable_from=dest_unreachable_from,
dest_unreachable_count=len(dest_unreachable),
timeout_count=len([x for x in errors if isinstance(x, Timeout)]),
ttl_exceeded_count=len(ttl_exceeded)
)
def _report(records, start_time, log_interval, conf):
log.debug('Evaluating records over %s seconds', log_interval)
if int(time.time() - start_time) % log_interval or \
len(records) < int(log_interval/conf.interval):
return
interval_slice = 0-int(log_interval/conf.interval)
records = records[interval_slice:]
delays = [x for x in records if isinstance(x, (int, float))]
errors = [x for x in records if isinstance(x, PingError)]
content = dict(
interval=f'{log_interval}s',
timestamp=time.asctime(),
min=min(delays),
max=max(delays),
average=sum(delays)/len(delays),
median=median(_errors_as_numbers(records)) or 'is error',
errors=_error_report(errors)
)
if conf.stdout:
print(content)
return
report_filename = os.path.join(conf.log_dir, f'{conf.target}.log')
log.debug('Writing log to %s', report_filename)
if os.path.exists(report_filename):
report_log = json.load(open(report_filename, 'r'))
else:
report_log = []
report_log.append(content)
json.dump(report_log, open(report_filename, 'w'), indent=4, sort_keys=True)
def _main():
parser = argparse.ArgumentParser(
description='Pings a remote host and logs statistics about latency. '
'Avoid abusing services like 8.8.8.8 which respond to ICMP '
'pings but are not network performance testing services.'
)
parser.add_argument('-t', '--target', required=True,
help='The remote host to ping.')
parser.add_argument('-i', '--interval', type=int, default=3,
help='Ping interval in seconds.')
parser.add_argument('-l', '--log-dir', default='/var/log/latency-sensor')
parser.add_argument(
'--log-intervals',
type=int,
nargs='+',
default=[60, 300, 900],
help='The chunks of time (in seconds) to evaluate for log statistics. '
'The default is to evaluate every 1, 5, and 15 minutes (60, 300, '
'and 900 seconds).'
)
parser.add_argument(
'--stdout',
action='store_true',
default=False,
help='Print reports to stdout instead of writing them to disk.'
)
parser.add_argument('--debug', action='store_true', default=False)
args = parser.parse_args()
if args.debug:
log.setLevel(logging.DEBUG)
print('Target host:', args.target)
print('Ping interval:', args.interval)
print('Log directory:', args.log_dir)
print('Log intervals:', args.log_intervals)
start_time = time.time()
records = []
if not args.stdout:
os.makedirs(args.log_dir, exist_ok=True)
try:
while True:
try:
delay = ping(args.target)
records.append(delay)
log.debug('Pinged %s: %s', args.target, delay)
except PingError as err:
log.error(err)
records.append(err)
for log_interval in args.log_intervals:
_report(records, start_time, log_interval, args)
# Prune old records
while len(records) > int(max(args.log_intervals)/args.interval):
records.pop(0)
time.sleep(args.interval)
except KeyboardInterrupt:
print(records)
if __name__ == '__main__':
_main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment