Last active
November 3, 2020 17:47
-
-
Save jbylund/a4b19dcd5de9aad23de799dfaebf8db1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import json | |
import logging | |
import re | |
import time | |
import xml.etree.ElementTree | |
import traceback | |
import dateutil.parser | |
import pg8000 | |
import requests | |
from bs4 import BeautifulSoup | |
def tostr(iobj): | |
try: | |
json.dumps(iobj) | |
return iobj | |
except: | |
return str(iobj) | |
class CableModemAPI: | |
def __init__(self): | |
self.logger = logging.getLogger("CableModemAPI") | |
self.session = None | |
self.password = "xxx" | |
self.user = "admin" | |
self.auth = (self.user, self.password) | |
self.login() | |
def login(self): | |
self.session = requests.Session() | |
self.session.auth = self.auth | |
return | |
login_page_get_response = self.session.get("http://192.168.100.1/GenieLogin.asp") | |
login_page_get_response.raise_for_status() | |
self.token = self.get_token(login_page_get_response.text) | |
login_post_response = self.session.post( | |
"http://192.168.100.1/goform/GenieLogin", | |
data={ | |
"login": 1, | |
"loginPassword": "8oR*01$#ZLNAUapO3jvI", | |
"loginUsername": "admin", | |
"webToken": self.token, | |
}, | |
) | |
login_post_response.raise_for_status() | |
def get_token(self, text): | |
words = text.split() | |
token_idx = words.index('name="webToken"') + 1 | |
self.token = token = words[token_idx].partition("=")[2] | |
return token | |
def reboot(self): | |
response = self.session.get("http://192.168.100.1/RouterStatus.asp") | |
response.raise_for_status() | |
self.get_token(response.text) | |
response = self.session.post( | |
"http://192.168.100.1/goform/RouterStatus", | |
data={ | |
"buttonSelect": 2, | |
"wantype": "dhcp", | |
"enable_apmode": 0, | |
"RsReboot": "", | |
"RetailSessionId": "", | |
"webToken": self.token, | |
}, | |
) | |
response.raise_for_status() | |
def get_logs(self): | |
self.logger.info("Fetching logs...") | |
response = self.session.get("http://192.168.100.1/EventLog.asp") | |
response.raise_for_status() | |
self.get_token(response.text) | |
self.logger.info("Parsing logs...") | |
xml_data = ( | |
response.text.split("xmlFormat")[1].partition("'")[2].rpartition("'")[0] | |
) | |
parsed = xml.etree.ElementTree.fromstring(xml_data) | |
rows = [] | |
for row in parsed: | |
as_dict = {elem.tag[9:].lower().strip(): elem.text for elem in row} | |
for datefield, dateval in as_dict.items(): | |
if datefield.endswith("time"): | |
try: | |
as_dict[datefield] = dateutil.parser.parse(dateval) | |
except: | |
as_dict[datefield] = datetime.datetime.now() | |
as_dict["msg"] = as_dict.pop("text") | |
rows.append(as_dict) | |
self.logger.info("Got logs...") | |
return rows | |
def clear_log(self): | |
self.logger.info("Purging log from modem...") | |
post_data = {"clear_log": 1, "RetailSessionId": "", "webToken": self.token} | |
response = self.session.post( | |
"http://192.168.100.1/goform/EventLog", | |
data=post_data, | |
) | |
response.raise_for_status() | |
self.logger.info("Purged logs...") | |
def record(self): | |
data = self.get_logs() | |
self.clear_log() | |
insert_query = """ | |
INSERT INTO homenet.cable_modem | |
( counts, firsttime, id, index, lasttime, level, msg ) VALUES | |
( %(counts)s, %(firsttime)s, %(id)s, %(index)s, %(lasttime)s, %(level)s, %(msg)s ) | |
""".strip() | |
self.logger.info("Recording %s logs in db...", len(data)) | |
pg8000.paramstyle = "pyformat" | |
with pg8000.connect( | |
user="jbylund", database="jbylund", host="dionysus", password="magicpass" | |
) as conn: | |
cursor = conn.cursor() | |
for row in data: | |
cursor.execute(insert_query, row) | |
conn.commit() | |
self.logger.info("%s logs recorded in db...", len(data)) | |
def parse_table(self, page_soup, table_id): | |
html_table = page_soup.find("table", id=table_id) | |
non_alpha = re.compile("[^a-z]+") | |
rows = html_table.find_all("tr") | |
header_row = rows[0] | |
fields = [ | |
non_alpha.sub("_", icol.get_text().lower()) | |
for icol in header_row.find_all("td") | |
] | |
def ident(iarg): | |
return iarg | |
process_map = { | |
"channel": int, | |
"channel_id": int, | |
"frequency": lambda s: int(s.strip().split()[0]), | |
"power": lambda s: float(s.strip().split()[0]), | |
"snr_mer": lambda s: float(s.strip().split()[0]), | |
"correctable_codewords": int, | |
"uncorrectable_codewords": int, | |
"unerrored_codewords": int, | |
} | |
parsed = [] | |
for row in rows[1:]: | |
row_info = dict(zip(fields, (c.get_text() for c in row.find_all("td")))) | |
for k, v in row_info.items(): | |
row_info[k] = process_map.get(k, ident)(v) | |
if row_info.pop("lock_status", "Not Locked") == "Not Locked": | |
continue | |
parsed.append(row_info) | |
return parsed | |
def get_metrics(self): | |
attempts = list(range(3)) | |
while attempts: | |
attempts.pop() | |
try: | |
response = self.session.get("http://192.168.100.1/DocsisStatus.asp", timeout=2 * 3) | |
response.raise_for_status() | |
break | |
except: | |
self.logger.error(traceback.format_exc()) | |
if not attempts: | |
raise | |
self.login() | |
soup = BeautifulSoup(response.text, features="html.parser") | |
downstream_table = self.parse_table(soup, "dsTable") | |
upstream_table = self.parse_table(soup, "usTable") | |
ds_ofdm_table = self.parse_table(soup, "d31dsTable") | |
us_ofdma_table = self.parse_table(soup, "d31usTable") | |
ds_hash = { | |
"channel_type": "bonded", | |
"direction": "downstream", | |
"data": downstream_table, | |
} | |
us_hash = { | |
"channel_type": "bonded", | |
"direction": "upstream", | |
"data": upstream_table, | |
} | |
ds_ofdm_hash = { | |
"channel_type": "ofdm", | |
"direction": "downstream", | |
"data": ds_ofdm_table, | |
} | |
us_ofdma_hash = { | |
"channel_type": "ofdma", | |
"direction": "upstream", | |
"data": us_ofdma_table, | |
} | |
uptime_node = soup.find("td", id="SystemUpTime").get_text().partition(":")[2] | |
bits = [int(x) for x in uptime_node.split(":")] | |
total_uptime = 0 | |
for i in bits: | |
total_uptime = 60 * total_uptime + i | |
return { | |
"down_bonded": ds_hash, | |
"up_bonded": us_hash, | |
"down_ofdm": ds_ofdm_hash, | |
"up_ofdma": us_ofdma_hash, | |
"uptime": total_uptime, | |
} | |
def continulog(self): | |
while True: | |
time.sleep(60) | |
self.record() | |
def recursive_codewords(indata): | |
if isinstance(indata, dict): | |
this_layer = 0 | |
for key, val in indata.items(): | |
if key.endswith("codewords"): | |
this_layer += val | |
else: | |
this_layer += recursive_codewords(val) | |
return this_layer | |
elif isinstance(indata, list): | |
return sum(recursive_codewords(i) for i in indata) | |
return 0 | |
def main(): | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s %(levelname)s %(name)s %(message)s", | |
datefmt="%Y-%m-%d %H:%M:%S", | |
) | |
api = CableModemAPI() | |
while True: | |
time.sleep(1) | |
try: | |
print(api.get_metrics()["uptime"]) | |
except: | |
print(traceback.format_exc()) | |
# api.continulog() | |
return | |
last = 0 | |
while time.sleep(3) is None: | |
metrics = api.get_metrics() | |
total_codewords = recursive_codewords(metrics) | |
delta = total_codewords - last | |
last = total_codewords | |
print("{:,}\t{:,}".format(total_codewords, delta)) | |
if "__main__" == __name__: | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment