Skip to content

Instantly share code, notes, and snippets.

@sourceperl
Last active April 12, 2024 18:56
Show Gist options
  • Star 22 You must be signed in to star a gist
  • Fork 7 You must be signed in to fork a gist
  • Save sourceperl/10288663 to your computer and use it in GitHub Desktop.
Save sourceperl/10288663 to your computer and use it in GitHub Desktop.
Python script for do multi-threaded ping
#!/usr/bin/env python
# ping a list of host with threads for increase speed
# use standard linux /bin/ping utility
from threading import Thread
import subprocess
try:
import queue
except ImportError:
import Queue as queue
import re
# some global vars
num_threads = 15
ips_q = queue.Queue()
out_q = queue.Queue()
# build IP array
ips = []
for i in range(1,200):
ips.append("192.168.0."+str(i))
# thread code : wraps system ping command
def thread_pinger(i, q):
"""Pings hosts in queue"""
while True:
# get an IP item form queue
ip = q.get()
# ping it
args=['/bin/ping', '-c', '1', '-W', '1', str(ip)]
p_ping = subprocess.Popen(args,
shell=False,
stdout=subprocess.PIPE)
# save ping stdout
p_ping_out = str(p_ping.communicate()[0])
if (p_ping.wait() == 0):
# rtt min/avg/max/mdev = 22.293/22.293/22.293/0.000 ms
search = re.search(r'rtt min/avg/max/mdev = (.*)/(.*)/(.*)/(.*) ms',
p_ping_out, re.M|re.I)
ping_rtt = search.group(2)
out_q.put("OK " + str(ip) + " rtt= "+ ping_rtt)
# update queue : this ip is processed
q.task_done()
# start the thread pool
for i in range(num_threads):
worker = Thread(target=thread_pinger, args=(i, ips_q))
worker.setDaemon(True)
worker.start()
# fill queue
for ip in ips:
ips_q.put(ip)
# wait until worker threads are done to exit
ips_q.join()
# print result
while True:
try:
msg = out_q.get_nowait()
except queue.Empty:
break
print(msg)
#!/usr/bin/env python
# ping a list of host with threads for increase speed
# design to use data from/to SQL database
# use standard linux /bin/ping utility
from threading import Thread
import mysql.connector
import subprocess
try:
import queue
except ImportError:
import Queue as queue
import time
import re
# some global vars
num_threads = 30
ips_q = queue.Queue()
out_q = queue.Queue()
# thread code : wraps system ping command
def thread_pinger(i, q):
"""Pings hosts in queue"""
while True:
# get an IP item form queue
item = q.get()
# ping it
args=['/bin/ping', '-c', '1', '-W', str(item['timeout']),
str(item['ip'])]
p_ping = subprocess.Popen(args,
shell=False,
stdout=subprocess.PIPE)
# save ping stdout
p_ping_out = str(p_ping.communicate()[0])
# ping return 0 if up
if (p_ping.wait() == 0):
# rtt min/avg/max/mdev = 22.293/22.293/22.293/0.000 ms
search = re.search(r'rtt min/avg/max/mdev = (.*)/(.*)/(.*)/(.*) ms',
p_ping_out, re.M|re.I)
item['up'] = True
item['rtt'] = search.group(2)
else:
item['up'] = False
# update output queue
out_q.put(item)
# update queue : this ip is processed
q.task_done()
# start the thread pool
for i in range(num_threads):
worker = Thread(target=thread_pinger, args=(i, ips_q))
worker.setDaemon(True)
worker.start()
# build IP array
ips = []
for i in range(1,200):
ips.append("192.168.0."+str(i))
# main loop
while True:
# retreive data from DB
# add SQL here
# test start time
start = time.time()
# fill queue
for ip in ips:
ips_q.put({'ip': ip, 'timeout': 1})
# wait until worker threads are done to exit
ips_q.join()
# display result
print("next:")
while True:
try:
msg = out_q.get_nowait()
except queue.Empty:
break
if msg['up']:
print(msg)
# test start end
end = time.time()
loop_time = round(end - start, 2)
print("loop time: %s" % (loop_time))
# update DB
#add SQL here
# wait 5s before next cycle
time.sleep(5.0)
@themegabyte
Copy link

will this work on windows?

@sourceperl
Copy link
Author

no, it will not work on windows

@NiroDev26
Copy link

hello, how would i go about modifying it for windows?
also, what is the difference in using queue here rather than async?
thanks

@adrianyorke
Copy link

adrianyorke commented Oct 26, 2023

Thanks for this script @sourceperl

For those asking about Windows, you change the subprocess args like this:
args = ["PING.EXE", "-n", "1", "-w", "1", str(ip)]

Also replace this block of code with the following:

    if p_ping.wait() == 0:
        # Minimum = 1ms, Maximum = 1ms, Average = 1ms
        search = re.search(
            "Minimum = (.*)ms, Maximum = (.*)ms, Average = (.*)ms",
            p_ping_out,
            re.M | re.I,
        )
        ping_rtt = search.group(3)
        out_q.put(f"OK {str(ip)} rtt (avg)={ping_rtt}ms")

@themegabyte
Copy link

@adrianrjh
Copy link

adrianrjh commented Apr 12, 2024

Hello @sourceperl, thank's for the code, i'm coding this

from threading import Thread, Lock
_db_lock = Lock()
import threading, time
import subprocess
import queue
import re
from redistimeseries.client import Client
from redis import StrictRedis, ConnectionError
import json
import sys

import os
import time, json

# some global vars
num_threads = 15
ips_q = queue.Queue()
out_qUp = queue.Queue()
out_qLow = queue.Queue()
ipRedis = '192.168.1.100'

def decode_redis(src):
    if isinstance(src, list):
        rv = list()
        for key in src:
            rv.append(decode_redis(key))
        return rv
    elif isinstance(src, dict):
        rv = dict()
        for key in src:
            rv[key.decode()] = decode_redis(src[key])
        return rv
    elif isinstance(src, bytes):
        return src.decode()
    else:
        raise Exception("type not handled: " +type(src))

def doQuerySUBS():
    global r
    subs = []
    json_datos = {}
    band = 0
    print("Get data subscribers...")
    data = decode_redis(r.hgetall('infraYI'))
    if data == {}:
        data = 'NO'
    else:
        data = json.loads(data['data'])
        pass
    for sub in range(len(data)):
        if data[sub][0] != '' and data[sub][9] != '' and data[sub][10] != '' and data[sub][11] != '':
            subs.append((data[sub][0], data[sub][1], data[sub][2], data[sub][3], 
                         data[sub][4], data[sub][5], data[sub][6], data[sub][7], 
                         data[sub][8], data[sub][9], data[sub][10], data[sub][11], 
                         data[sub][12], data[sub][13], data[sub][14], data[sub][15]))
        else:
            pass
    return subs

try:
    rts = Client(host=ipRedis,port=6379,socket_keepalive=True,retry_on_timeout=True)    
except Exception as e:
    print(e)

try:
    r = StrictRedis(host=ipRedis,port=6379,db=0,health_check_interval=30,socket_keepalive=True)
except Exception as e:
    print(e)

# thread code : wraps system ping command
def thread_pinger(i, q):
  """Pings hosts in queue"""
  p_ping_outS = []
  while True:
    # get an IP item form queue
    ip = q.get()
    # ping it
    args=['/bin/ping', '-c', '1', '-W', '1', str(ip)]
    p_ping = subprocess.Popen(args, close_fds=True, shell=False, stdout=subprocess.PIPE)
    # save ping stdout
    p_ping_out = p_ping.communicate()[0].decode('utf-8')
    ######## DEVICES UṔ ########
    if (p_ping.wait() == 0):
      search = re.search(r'rtt min/avg/max/mdev = (.*)/(.*)/(.*)/(.*) ms',p_ping_out, re.M|re.I)
      ping_rtt = search.group(2)
      out_qUp.put("UP " + str(ip) + " rtt= "+ ping_rtt+' ms')
      try:
        rts.add(str(ip), int(time.time()), float(ping_rtt))
        print(out_qUp.get_nowait())
      except Exception as e:
        print("[ERROR IP UP]########### rts.create "+str(e))
        #print(out_qUp.get_nowait())
    ######## DEVICES DOWN ########
    if (p_ping.wait() != 0):
      p_ping_outS = p_ping_out.split(' ')
      try:
        rts.add(str(p_ping_outS[1]), int(time.time()), float(0.0))
        print("DOWN "+str(p_ping_outS[1])+' 0.0'+' ms')
      except Exception as e:
        if len(p_ping_outS) > 1:
          print("[ERROR IP DOWN]########### rts.create "+str(e))
        else:
          pass
    time.sleep(1)

    # update queue : this ip is processed 
    q.task_done()

class Listener1(threading.Thread):
    def __init__(self, r, channels):
        threading.Thread.__init__(self)
        self.redis,self.init = r,0
        self.pubsub = self.redis.pubsub()
        print('Listener1...')
        try:
            self.pubsub.subscribe(channels)
        except Exception as e:
            print(e)

    def work(self):
        try:
          ips = []
          ipSubs = doQuerySUBS()
          for x in range(0,len(ipSubs)):
            ips.append(ipSubs[x][9])
          #start the thread pool
          for i in range(num_threads):
            worker = Thread(target=thread_pinger, args=(i, ips_q))
            worker.setDaemon(True)
            worker.start()
          # fill queue
          for ip in ips:
            ips_q.put(ip)
          # wait until worker threads are done to exit    
          ips_q.join()
        except Exception as e:
            print(e)
        time.sleep(5)

    def run(self):
        while True:
            try:
                self.work()
            except ConnectionError:
                print('[lost connection]')
                while True:
                    print('trying to reconnect...')
                    try:
                        self.redis.ping()
                    except ConnectionError:
                        time.sleep(10)
                    else:
                        self.pubsub.subscribe(['last_session'])
                        break
            time.sleep(0.001)  # be nice to the system :)

client = Listener1(r, ['last_session','LT01TP0LT'])
client.start()

.... for making your example th_pinger.py every 5 seconds, but after minutes appear this error:

OSError: [Errno 24] Too many open files
  File "/usr/lib/python3.8/subprocess.py", line 1605, in _execute_child
    errpipe_read, errpipe_write = os.pipe()
OSError: [Errno 24] Too many open files

.... you know why occur it?

@sourceperl
Copy link
Author

Unfortunately not, do you have the same problem with basic code like this one: https://gist.github.com/sourceperl/0ef3719e8fef2c95d98c590ff1e7cefd ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment