Last active
February 9, 2022 20:25
-
-
Save HLSiira/b28132f8f443d551968ae764de8cc901 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os, sys | |
import time | |
import json | |
import csv | |
import statistics | |
import subprocess | |
from datetime import datetime | |
from collections import namedtuple | |
from utils import hasFlag, cPrint, SCANID | |
DEBUG = hasFlag("d") | |
# Header at the top of every CSV | |
dailyHeader = ["Hour", "DateTime", "Ping", "Download", "Upload", "TestID"] | |
yearlyHeader = ["Day", "DateTime", "Ping", "Download", "Upload"] | |
# Directory of data location, use trailing "/" | |
dataLocation = "" | |
# File path of today's speed tests | |
csvToday = f'{dataLocation}daily/{time.strftime("%m-%d")}.csv' | |
# File path of the annual CSV file | |
csvYearly = f'{dataLocation}yearly/{time.strftime("%y")}.csv' | |
def byteToHuman(bytes, to, bsize=1024): | |
a = {"k": 1, "m": 2, "g": 3, "t": 4, "p": 5, "e": 6} | |
r = float(bytes) | |
return bytes / (bsize ** a[to]) | |
def byteToBits(bytes, to, bsize=125): | |
a = {"k": 1, "m": 1000, "g": 3, "t": 4, "p": 5, "e": 6} | |
r = float(bytes) | |
return round(bytes / (bsize * a[to]), 2) | |
# Function that formats the rows visually how I like them within a CSV File | |
def rowFormat(text): | |
# Yearly CSV only has 5 columns | |
if len(text) == 5: | |
return "{:^5} |{:^14} |{:^10} |{:^12} |{:^10}".format(*text).split("|", 0) | |
# Daily CSV has 6 columns | |
else: | |
return "{:^5} |{:^14} |{:^10} |{:^12} |{:^10} |{}".format(*text).split("|", 0) | |
# Generate a blank file | |
def genFile(path, header, length): | |
with open(path, "w") as csvfile: | |
# Open/create the csvFile writer | |
writer = csv.writer(csvfile) | |
# Add the header | |
writer.writerow(rowFormat(header)) | |
for row in range(length - 1): | |
# Yearly CSV only has 5 columns | |
if len(header) == 5: | |
writer.writerow(rowFormat([row + 1, 0, 0, 0, 0])) | |
# Daily CSV has 6 columns | |
else: | |
writer.writerow(rowFormat([row + 1, 0, 0, 0, 0, 0])) | |
writer.writerow(rowFormat(["AVG", 0, 0, 0, 0])) | |
# Read from CSV into Tuple Array | |
def readCSV(path, testTuple): | |
temp = [] | |
with open(path) as csvfile: | |
readCSV = csv.reader(csvfile, delimiter="|") | |
for row in readCSV: | |
temp.append(testTuple(*row)) | |
return temp | |
# Create the daily and yearly files if they don't exist already | |
if DEBUG or not os.path.isfile(csvToday): | |
if DEBUG: | |
print("Generating daily file") | |
genFile(csvToday, dailyHeader, 25) | |
if DEBUG or not os.path.isfile(csvYearly): | |
if DEBUG: | |
print("Generating yearly file") | |
genFile(csvYearly, yearlyHeader, 365) | |
# Tuple representing hourly test | |
testHourly = namedtuple("testHourly", ("hour datetime ping download upload testID")) | |
testDaily = namedtuple("testHourly", ("day datetime ping download upload")) | |
testsToday = readCSV(csvToday, testHourly) | |
testsYearly = readCSV(csvYearly, testsYearly) | |
results = False | |
if hasFlag("n"): | |
# Pull from a sample JSON | |
f = open("speedtest.json", "r") | |
test = f.read() | |
results = json.loads(test) | |
else: | |
# Run speedtest-cli and use live data | |
test = subprocess.Popen('/usr/bin/speedtest -f json', shell=True, stdout=subprocess.PIPE).stdout.read() | |
results = json.loads(test.decode('utf-8')) | |
# Early exit if something is wrong with the data | |
if not results: | |
cPrint("Error in retrieving data") | |
exit(1) | |
ping = str(round(results["ping"]["latency"], 2)) | |
download = byteToBits(results["download"]["bandwidth"], "m") | |
upload = byteToBits(results["upload"]["bandwidth"], "m") | |
testID = results["result"]["id"] | |
cPrint(f'P{ping}, D{download}, U{upload} - {testID}') | |
exit(0) | |
# Generate arrays for Today's tests and the yearly | |
TODAY.append(SpeedTest(SCANID, ping, download, upload, testID)) | |
with open(csvToday, "w") as csvfile: | |
writer = csv.writer(csvfile) | |
header = "{:^14} |{:^10} |{:^12} |{:^10}".format(*header).split("|", 0) | |
writer.writerow(header) | |
for speedtest in TODAY: | |
speedtest = "{:^14} |{:>10} |{:>12} |{:>10}".format(*speedtest).split("|", 0) | |
writer.writerow(speedtest) | |
exit(0) | |
if DEBUG or int(time.strftime("%H")) >= 23: | |
Summary = namedtuple( | |
"Summary", | |
( | |
"date average_ping lowest_download average_download highest_download lowest_upload average_upload highest_upload" | |
), | |
) | |
SUMMARY = Summary( | |
time.strftime("%m-%d"), | |
round(statistics.mean([float(test.ping) for test in TODAY]), 2), | |
round(min([float(test.download) for test in TODAY]), 2), | |
round(statistics.mean([float(test.download) for test in TODAY]), 2), | |
round(max([float(test.download) for test in TODAY]), 2), | |
round(min([float(test.upload) for test in TODAY]), 2), | |
round(statistics.mean([float(test.upload) for test in TODAY]), 2), | |
round(max([float(test.upload) for test in TODAY]), 2), | |
) | |
SUMMARY | |
SUMMARY = [ | |
SUMMARY.date, | |
SUMMARY.average_ping, | |
f"{SUMMARY.lowest_download}/{SUMMARY.average_download}/{SUMMARY.highest_download}", | |
f"{SUMMARY.lowest_upload}/{SUMMARY.average_upload}/{SUMMARY.highest_upload}", | |
] | |
HISTORY.append(SUMMARY) | |
try: | |
if os.stat(csvAnnual).st_size >= 0: | |
with open(csvAnnual) as csvfile: | |
readCSV = csv.reader(csvfile, delimiter="|") | |
next(readCSV) | |
for row in readCSV: | |
HISTORY.append(row) | |
except: | |
pass | |
with open(csvAnnual, "w") as csvfile: | |
writer = csv.writer(csvfile) | |
header = [ | |
"Date", | |
"Avg Ping(ms)", | |
"Download Speeds(Mb/s)", | |
"Upload Speeds(Mb/s)", | |
] | |
header = "{:^7}|{:>12}|{:>22}|{:>20}".format(*header) | |
writer.writerow(header.split("|", 0)) | |
for speedtest in HISTORY: | |
speedtest = "{:^7}|{:^12}|{:^22}|{:^20}".format(*speedtest) | |
writer.writerow(speedtest.split("|", 0)) | |
if float(download) < 60 or float(upload) < 5: | |
speed = SpeedTest(SCANID, ping, download, upload) | |
if DEBUG: | |
print(speed) | |
else: | |
cPrint(f"Connection speeds outside of defined bounderies...Sending alert...") | |
else: | |
cPrint(f"Connection speeds within defined bounderies") | |
exit(0) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"type":"result", | |
"timestamp":"2021-05-15T20:14:48Z", | |
"ping":{ | |
"jitter":2.601, | |
"latency":15.789999999999999 | |
}, | |
"download":{ | |
"bandwidth":8131860, | |
"bytes":109749712, | |
"elapsed":15013 | |
}, | |
"upload":{ | |
"bandwidth":6717305, | |
"bytes":89707944, | |
"elapsed":13315 | |
}, | |
"packetLoss":0, | |
"isp":"WideOpenWest", | |
"interface":{ | |
"internalIp":"192.168.1.1", | |
"name":"eno1", | |
"macAddr":"00:00:00:00:00:00", | |
"isVpn":false, | |
"externalIp":"00.00.000.000" | |
}, | |
"server":{ | |
"id":15775, | |
"name":"CarolinaConnect", | |
"location":"Lexington, SC", | |
"country":"United States", | |
"host":"speedtest.carolinaconnect.com", | |
"port":8080, | |
"ip":"162.213.116.4" | |
}, | |
"result":{ | |
"id":"null", | |
"url":"null" | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os, sys | |
import socket | |
from subprocess import Popen, PIPE | |
from datetime import datetime | |
host = socket.gethostname().upper() | |
name = os.path.basename(sys.argv[0]) | |
# SCANID is also the date/time | |
SCANID = datetime.now().strftime("%Y%m%d%H%M") | |
def cPrint(msg): | |
print(f"{name}: {SCANID} {msg}") | |
def hasFlag(flg): | |
return len(sys.argv) > 1 and flg in sys.argv[1] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment