Skip to content

Instantly share code, notes, and snippets.

@creamidea
Last active February 4, 2017 14:29
Show Gist options
  • Save creamidea/f18b8a959d52a4b99064e9f02ea3991b to your computer and use it in GitHub Desktop.
Save creamidea/f18b8a959d52a4b99064e9f02ea3991b to your computer and use it in GitHub Desktop.
测试 IP 联通质量
#!/usr/local/bin/python3
#-*- encoding:utf-8 -*-
from operator import itemgetter
from queue import Queue
from threading import Thread
from subprocess import STDOUT, check_output, CalledProcessError
from collections import OrderedDict
import re
class PingTask(Thread):
p0 = re.compile(r'--- (?P<target>([0-9.]+?)) ping statistics ---')
p1 = re.compile(r'(?P<transmitted>\d+?) packets transmitted, (?P<received>\d+?) packets received, (?P<loss>\d+?.\d+?)% packet loss')
p2 = re.compile(r'round-trip min/avg/max/stddev = (?P<min>\d+?.\d+?)/(?P<avg>\d+?.\d+?)/(?P<max>\d+?.\d+?)/(?P<stddev>\d+?.\d+?) ms')
def __init__(self, tasks):
super(PingTask, self).__init__()
self.tasks = tasks
self.daemon = True
self.start()
def convert(self, value):
try:
value.index('.') >= 0
value = float(value)
except ValueError:
value = int(value)
finally:
return value
def _getTarget(self, retval):
g0 = self.p0.search(retval)
return {"target": g0.group('target')}
def _getLoss(self, retval):
g1 = self.p1.search(retval)
info = {}
for name in ['transmitted', 'received', 'loss']:
value = g1.group(name)
info[name] = self.convert(value)
return info
def _getSpeed(self, retval):
g2 = self.p2.search(retval)
info = {}
for name in ['min', 'avg', 'max', 'stddev']:
value = g2.group(name)
info[name] = self.convert(value)
return info
def parse(self, retval):
info = {}
info.update(self._getTarget(retval))
info.update(self._getLoss(retval))
info.update(self._getSpeed(retval))
return info
def run(self):
ip, results = self.tasks.get()
try:
print('>>> Start ping {ip}...'.format(ip=ip))
retval = check_output(
["ping", "-c20", "-q", "-i0.1", "-W1", ip], stderr=STDOUT
)
except CalledProcessError as e:
out_bytes = e.output # Output generated before error
code = e.returncode # Return code
results.put({"target": ip, "code": code, "message": out_bytes})
else:
retval = retval.decode('utf-8')
# print('bbbb>>> ', retval)
results.put(self.parse(retval))
finally:
self.tasks.task_done()
def ReadIPs(filename = 'ips.txt'):
with open(filename, 'rb') as f:
targets = f.read()
targets = targets.decode('utf-8').split('\n')
yield targets
def handleResults(results):
l = [x for x in list(results.queue) if "code" not in x]
l.sort(key=itemgetter('loss', 'avg', 'min', 'max'))
printInfo(l)
def printInfo(results):
for idx, info in enumerate(results):
print("{index}: [{loss}]\t[{avg}]\t[{min}]\t[{max}]:\t<{target}>".format(
index = idx,
target = info["target"],
loss = info["loss"],
avg = info["avg"],
min = info["min"],
max = info["max"]
))
if __name__ == '__main__':
pools = ["192.168.1.100", "192.168.1.1"]
alive = []
filename = 'ips.txt'
ips = ReadIPs(filename)
targets = (next(ips))
#targets = pools
count = len(targets)
tasks = Queue(count)
results = Queue(count)
for i in range(count):
tasks.put([targets[i], results])
_task = PingTask(tasks)
# _task.join()
tasks.join()
handleResults(results)
# a = '''
# --- 111.206.223.205 ping statistics ---
# 11 packets transmitted, 11 packets received, 36.4% packet loss
# round-trip min/avg/max/stddev = 2325.497/3858.896/5928.119/1244.209 ms
# '''
# print(a)
# b = "100 packets transmitted"
# print(p1.search(a).groups())
# print(p2.search(a).groups())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment