Skip to content

Instantly share code, notes, and snippets.

@tommie
Created July 21, 2022 16:25
Show Gist options
  • Save tommie/2084e3b8c90c1332788ad29e36f03e78 to your computer and use it in GitHub Desktop.
Save tommie/2084e3b8c90c1332788ad29e36f03e78 to your computer and use it in GitHub Desktop.
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import array
import json
import logging
import random
import signal
import socket
import struct
import subprocess
import sys
import threading
import time
DEFAULT_GW = ('0.0.0.0', 0)
ADDRS = {
DEFAULT_GW: 'gw',
('8.8.8.8', 0): 'dns',
}
RED = '#FF0000'
YELLOW = '#DDDD00'
class PingClient(object):
UPDATE_INTERVAL = 10
def __init__(self, addrs):
self._addrs = addrs
self._results = {}
self._id = random.randrange(2**16 - 1)
self._seq = 0
self._running = True
self._lock = threading.Lock()
self._cond = threading.Condition(self._lock)
self._thread = threading.Thread(target=self._run, name='PingClient')
self._thread.start()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
def get_results(self):
with self._lock:
return dict(self._results)
def close(self):
with self._lock:
self._running = False
self._cond.notify_all()
self._thread.join()
@property
def _is_running(self):
return self._running and self._thread.is_alive()
def _run(self):
try:
while self._is_running:
try:
logging.debug('Entering ping loop...')
default_route = subprocess.check_output(['ip', 'route', 'show', 'default']).strip()
if default_route:
default_route = (default_route.split()[2], DEFAULT_GW[1])
else:
default_route = None
self._seq = (self._seq + 16) % 2**16
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_ICMP) as s:
s.settimeout(1)
results = {addr: self._ping(s, addr if addr != DEFAULT_GW else default_route, self._id, self._seq)
if addr != DEFAULT_GW or default_route else None
for addr in self._addrs}
with self._lock:
self._results.update(results)
except Exception as ex:
logging.warning('Request failed (retrying later): %s', ex, exc_info=True)
with self._lock:
if self._cond.wait(self.UPDATE_INTERVAL):
break
except BaseException as ex:
logging.exception('Receive thread failed: %s', ex)
raise
def _echo_request2(self, checksum, id, seq, data):
return struct.pack(b'!BBHHH', 8, 0, checksum, id, seq) + data
def _echo_request(self, *args):
csum = ~sum(v for v, in struct.iter_unpack(b'!H', self._echo_request2(0, *args))) & 0xFFFF
return self._echo_request2(csum, *args)
def _parse_echo_reply(self, data):
type, code, csum, id, seq = struct.unpack(b'!BBHHH', data[:8])
ccsum = ~sum(v for v, in struct.iter_unpack(b'!H', data)) & 0xFFFF
if ccsum != 0:
raise ValueError('Invalid checksum: 0x{:04X}'.format(ccsum))
if type != 0:
raise ValueError('Invalid type: {}'.format(type))
if code != 0:
raise ValueError('Invalid code: {}'.format(code))
return id, seq, data[8:]
def _ping(self, s, addr, id, seq):
logging.debug('Ping %s...', addr)
try:
addr = (socket.gethostbyname(addr[0]), addr[1])
except socket.gaierror:
logging.debug('Failed to look up hostname.', exc_info=True)
return None
for i in range(3):
start = time.time()
try:
s.sendto(self._echo_request(id, seq, b''), addr)
data, raddr = s.recvfrom(1024)
end = time.time()
if raddr == addr:
rid, rseq, data = self._parse_echo_reply(data)
logging.debug('%s: id=%d(%d) seq=%d(%d)', addr, rid, id, rseq, seq)
if rseq == seq:
return end - start
except socket.timeout:
logging.debug('Receive timed out.')
except OSError as ex:
logging.debug('Send or receive failed:', ex)
time.sleep(1)
seq += 1
return None
def get_ping(client):
for addr, result in client.get_results().items():
if result is None:
text = 'P {}'.format(ADDRS[addr], result)
color = RED
elif result > 0.1:
text = 'P {}:{:.0f}ms'.format(ADDRS[addr], result * 1000)
color = YELLOW
else:
continue
yield {
'full_text': text,
'name': 'ping',
'markup': 'none',
'instance': '{}:{}'.format(*addr),
'color': color,
}
if __name__ == '__main__':
logging.basicConfig(stream=sys.stderr, level=logging.INFO)
with PingClient(ADDRS.keys()) as pinger:
# Skip the first line which contains the version header.
print(sys.stdin.readline().strip(), flush=True)
# The second line contains the start of the infinite array.
print(sys.stdin.readline().strip(), flush=True)
for line in sys.stdin:
line = line.strip()
prefix = ''
# ignore comma at start of lines
if line.startswith(','):
line, prefix = line[1:], ','
j = json.loads(line)
try:
j = list(get_ping(pinger)) + j
except BaseException as ex:
logging.exception('Failed to generate content (ignored): %s',
ex)
try:
print(prefix + json.dumps(j), flush=True)
except BrokenPipeError as ex:
logging.info('Terminating due to broken pipe: %s', ex)
try:
# Avoids "ignored exception" logging.
sys.stdout.close()
except BrokenPipeError:
pass
break
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment