Skip to content

Instantly share code, notes, and snippets.

@toxicantidote
Last active June 22, 2021 01:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save toxicantidote/9d89ad6d9003bbc67ed1a7a6289eac9d to your computer and use it in GitHub Desktop.
Save toxicantidote/9d89ad6d9003bbc67ed1a7a6289eac9d to your computer and use it in GitHub Desktop.
Network host ping tool
## Site ping check tool ##
##
## Originally developed and tested on Win10 with Python 2.7. Updates made with
## Python 3.7, but should still work on Python 2.x provided the Tk install
## isn't ancient.
##
## Reads one or more files containing details of sites to ping, pings them at
## the defined frequency, and displays the result in a GUI.
##
## Within the host files, the last word on each line is considered to be the
## target, and the other words on the line are the name displayed in the GUI.
## It is intended that this will allow the use of local hostnames, FQDNs and
## IPs for targets. Lines that are blank or start with # are ignored, allowing
## for neat formatting and comments.
##
## List of files with hosts to ping
host_lists = ['important hosts.txt', 'some other hosts.txt']
## How often to wait between pinging them all (seconds)
frequency = 30
## How many hosts to ping concurrently
threads = 20
## Timeout for pings (seconds)
timeout = 1
###
### NO EDITING BELOW HERE ###
import sys
import subprocess
import threading
from multiprocessing.pool import ThreadPool
import re
import time
import datetime
if sys.version_info[0] < 3:
import Queue as queue
import Tkinter as tkinter
import tkMessageBox as messagebox
import ttk
else:
import queue
import tkinter
import tkinter.messagebox as messagebox
import tkinter.ttk as ttk
###
class ui():
## called when the window is closed
def destroy(self):
print('Exiting!')
self.terminate = True
self.pinger.terminate = True
self.root.destroy()
## called when the class is initialised
def __init__(self):
global host_lists, frequency
self.terminate = False
self.pinger = pinger(self)
self.pinger.start()
self.update_queue = queue.Queue()
## basic ui config
self.root = tkinter.Tk()
self.root.protocol('WM_DELETE_WINDOW', self.destroy)
self.root.resizable(0,0)
self.root.title('Site ping tool')
## create icons
icon_b64_title = 'R0lGODlhGAAYAIABAAAAAP///yH5BAEKAAEALAAAAAAYABgAAAJPjI+py+0GonxJ2vms0gz3+CEcBCwgeZXkpnoBdh5xLKuvfa96zusn98MFUa2iyHgb9nrFX62CS0ZpOx6NulMuj09mVFR1Uq7fhstFSaspBQA7'
icon_b64_ok = 'R0lGODlhFAAUAIABAAD+AP///yH5BAEKAAEALAAAAAAUABQAAAIyjB+gi30LmUuxqmYzQNq+Ll0UaDCgeEZltkItO26xtr7kx1a2Oqe4/zthgI7OZOhyFAAAOw=='
icon_b64_alert = 'R0lGODlhFAAUAIABAP4AAP///yH5BAEKAAEALAAAAAAUABQAAAIyjI+pGwAM24vKOZrsxFLzbnGaR4VdaZIMqVZsi4yG7L4weOHbPPatD9wAeT0ibRhMAgoAOw=='
icon_b64_unknown = 'R0lGODlhFAAUAIABAP7+AP///yH5BAEKAAEALAAAAAAUABQAAAInDI6paOsP4wtNMootwLxCmoCSeJBa6WnZuZnWyrrsjNKtLdv6kqoFADs='
self.icon_title = tkinter.PhotoImage(data = icon_b64_title)
self.icon_ok = tkinter.PhotoImage(data = icon_b64_ok)
self.icon_alert = tkinter.PhotoImage(data = icon_b64_alert)
self.icon_unknown = tkinter.PhotoImage(data = icon_b64_unknown)
## window icon
self.root.tk.call('wm', 'iconphoto', self.root._w, self.icon_title)
## widget object storage for later reference
self.nb_widgets = dict()
self.ip_widgets = dict()
## progress bar for next ping
self.progress_bar = ttk.Progressbar(self.root, orient = 'horizontal', length = 100, mode = 'determinate', max = frequency)
self.progress_text = tkinter.Label(self.root, text= 'Loading hosts..')
self.progress_bar.grid(row = 0, column = 0, sticky = 'ew')
self.progress_text.grid(row = 0, column = 1, sticky = 'w')
self.progress_bar.start()
## notebook
self.nb = ttk.Notebook(self.root)
self.nb.grid(row = 1, column = 0, columnspan = 2, sticky = 'news')
self.root.update()
## make a tab for each file
self.all_hosts = []
tab_id = 0
for fname in host_lists:
## get a list of hosts
hosts = []
try:
with open(fname, 'r') as hfile:
content = hfile.read().splitlines()
except:
messagebox.showerror('Unable to read host list', 'An error occurred while reading ' + fname + '\nDoes the file exist and can it be read by the current user?')
continue
for line in content:
re_target = re.search(r'^([^#].+)\s(\S+)$', line)
if re_target:
hosts.append([re_target.group(1), re_target.group(2)])
self.all_hosts.append(re_target.group(2))
## move to the next file if no hosts were found
if len(hosts) < 1:
print('No hosts in ' + fname)
messagebox.showwarning('No hosts found', 'No host entries were found while reading ' + fname + '\n\nThis file has been ignored')
continue
else:
print('Found ' + str(len(hosts)) + ' hosts in ' + fname)
## make the tab for this file
nb_frame = tkinter.Frame(self.nb)
clean_name = re.sub(r'\_', ' ', fname)
clean_name = re.sub(r'\..+$', '', clean_name)
self.nb.add(nb_frame, text = clean_name.capitalize(), image = self.icon_unknown, compound = tkinter.TOP)
self.nb_widgets[fname] = tab_id
tab_id += 1
## make the gui elements for each host
row = 0
for name, address in hosts:
self.ip_widgets[address] = [None, None, None]
self.ip_widgets[address][0] = tkinter.Label(nb_frame, text = name, background = 'light yellow', anchor = 'w', padx = 10)
self.ip_widgets[address][1] = tkinter.Label(nb_frame, text = 'No replies seen', width = 22, anchor = 'w', padx = 10)
self.ip_widgets[address][0].grid(row = row, column = 0, sticky = 'ew')
self.ip_widgets[address][1].grid(row = row, column = 1, sticky = 'ew')
self.ip_widgets[address][2] = self.nb_widgets[fname]
row += 1
if len(self.all_hosts) < 1:
messagebox.showerror('No hosts found', 'No hosts were found in any available host list file')
def run(self):
self.root.after(100, self.do_ping)
self.root.mainloop()
def do_ping(self):
self.root.update()
initial_run = True
while self.terminate == False:
host_list = self.get_hosts()
self.pinger.start_ping(host_list)
self.progress_bar.config(mode = 'indeterminate')
self.progress_text.config(text = 'Waiting for replies...')
self.progress_bar.start()
while self.pinger.active == True:
if self._update_times() != False:
self.update_tabs()
self.root.update()
time.sleep(0.5)
end = time.time() + frequency
self.progress_bar.stop()
self.progress_bar.config(max = frequency, mode = 'determinate')
while time.time() < end:
try:
## accept ping data any time..
if self._update_times() != False:
self.update_tabs()
remaining = int((end - time.time()) + 1)
self.progress_bar.config(value = (frequency - remaining))
remaining_text = str(datetime.timedelta(seconds = remaining))
self.progress_text.config(text = 'Next ping in ' + remaining_text)
self.root.update()
time.sleep(0.5)
## if the window is closed during this we get an exception
except:
return
def get_hosts(self):
return self.all_hosts
def update_time(self, host, colour, message = None):
if self.terminate == False:
self.update_queue.put([host, colour, message])
## update last ping time for a host
def _update_times(self):
while self.update_queue.empty() == False:
try:
host, colour, message = self.update_queue.get_nowait()
except queue.Empty:
return False
try:
self.ip_widgets[host][0].configure(background = colour)
if message != None:
self.ip_widgets[host][1].configure(text = str(message))
except:
print('Error updating time for ' + host)
return True
## update tab icons to reflect the current state
def update_tabs(self, event = None):
bad = unknown = []
## find unknown and bad results
for entry in self.ip_widgets.keys():
if self.ip_widgets[entry][0]['background'] == 'light yellow':
if not self.ip_widgets[entry][2] in unknown:
unknown.append(self.ip_widgets[entry][2])
elif self.ip_widgets[entry][0]['background'] == 'coral1':
if not self.ip_widgets[entry][2] in bad:
bad.append(self.ip_widgets[entry][2])
## update tab colours
for entry in self.nb_widgets.keys():
if self.nb_widgets[entry] in bad:
self.nb.tab(self.nb_widgets[entry], image = self.icon_alert)
elif self.nb_widgets[entry] in unknown:
self.nb.tab(self.nb_widgets[entry], image = self.icon_unknown)
else:
self.nb.tab(self.nb_widgets[entry], image = self.icon_ok)
class pinger(threading.Thread):
def __init__(self, gui):
## initialise self as a thread
threading.Thread.__init__(self)
self.name = 'Ping-controller'
self.gui = gui
self.pq = queue.Queue()
self.terminate = False
self.active = False
def __del__(self):
self.terminate = True
def run(self):
global threads
## make a thread pool
for i in range(threads):
t = threading.Thread(target = self.ping, name = 'Ping-worker-' + str(i))
t.setDaemon(True)
t.start()
while self.terminate == False:
if self.pq.empty() == False:
self.pq.join()
## sleep here to reduce CPU usage from polling queue status
time.sleep(0.1)
self.active = False
def queue_size(self):
return self.pq.qsize()
def start_ping(self, targets):
if self.active == True:
print('ERROR: Not starting new ping. Existing pings in queue')
return
self.active = True
## push the queue to the workers
for host in targets:
self.pq.put(host)
def ping(self):
global timeout
while self.terminate == False:
target = self.pq.get()
try:
if sys.platform == 'win32':
output = subprocess.check_output('ping -n 1 -w ' + str(timeout * 1000) + ' ' + target, shell = True, stderr=subprocess.STDOUT)
else:
output = subprocess.check_output('ping -c 1 -W ' + str(timeout) + ' ' + target, shell = True, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError:
e = sys.exc_info()[1]
output = e.output
## Check the output
output = str(output)
re_ok = re.search(r'\(0%( packet)? loss\)', output)
if sys.platform == 'win32':
re_time = re.search(r'Average = (\d+)ms', output)
else:
re_time = re.search(r'rtt min\/avg.+=\s[\d\.]+\/([\d\.]+)\/', output)
## we use queues for output to avoid race conditions
stamp = '\n[' + datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H:%M:%S') + '] '
if re_ok and re_time:
print(stamp + 'Reply from ' + target + ' (' + str(re_time.group(1)) + 'ms)')
self.gui.update_time(target, 'OliveDrab1', datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H:%M:%S') + ' (' + str(re_time.group(1)) + 'ms)')
else:
print(stamp + 'NO REPLY FROM ' + target)
self.gui.update_time(target, 'coral1')
self.pq.task_done()
## start here
gui = ui()
gui.run()
@toxicantidote
Copy link
Author

Needs some more work to properly have the pinger thread use the tkinter events system.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment