Skip to content

Instantly share code, notes, and snippets.

@jbylund
Last active November 3, 2020 17:47
Show Gist options
  • Save jbylund/a4b19dcd5de9aad23de799dfaebf8db1 to your computer and use it in GitHub Desktop.
Save jbylund/a4b19dcd5de9aad23de799dfaebf8db1 to your computer and use it in GitHub Desktop.
import datetime
import json
import logging
import re
import time
import xml.etree.ElementTree
import traceback
import dateutil.parser
import pg8000
import requests
from bs4 import BeautifulSoup
def tostr(iobj):
try:
json.dumps(iobj)
return iobj
except:
return str(iobj)
class CableModemAPI:
def __init__(self):
self.logger = logging.getLogger("CableModemAPI")
self.session = None
self.password = "xxx"
self.user = "admin"
self.auth = (self.user, self.password)
self.login()
def login(self):
self.session = requests.Session()
self.session.auth = self.auth
return
login_page_get_response = self.session.get("http://192.168.100.1/GenieLogin.asp")
login_page_get_response.raise_for_status()
self.token = self.get_token(login_page_get_response.text)
login_post_response = self.session.post(
"http://192.168.100.1/goform/GenieLogin",
data={
"login": 1,
"loginPassword": "8oR*01$#ZLNAUapO3jvI",
"loginUsername": "admin",
"webToken": self.token,
},
)
login_post_response.raise_for_status()
def get_token(self, text):
words = text.split()
token_idx = words.index('name="webToken"') + 1
self.token = token = words[token_idx].partition("=")[2]
return token
def reboot(self):
response = self.session.get("http://192.168.100.1/RouterStatus.asp")
response.raise_for_status()
self.get_token(response.text)
response = self.session.post(
"http://192.168.100.1/goform/RouterStatus",
data={
"buttonSelect": 2,
"wantype": "dhcp",
"enable_apmode": 0,
"RsReboot": "",
"RetailSessionId": "",
"webToken": self.token,
},
)
response.raise_for_status()
def get_logs(self):
self.logger.info("Fetching logs...")
response = self.session.get("http://192.168.100.1/EventLog.asp")
response.raise_for_status()
self.get_token(response.text)
self.logger.info("Parsing logs...")
xml_data = (
response.text.split("xmlFormat")[1].partition("'")[2].rpartition("'")[0]
)
parsed = xml.etree.ElementTree.fromstring(xml_data)
rows = []
for row in parsed:
as_dict = {elem.tag[9:].lower().strip(): elem.text for elem in row}
for datefield, dateval in as_dict.items():
if datefield.endswith("time"):
try:
as_dict[datefield] = dateutil.parser.parse(dateval)
except:
as_dict[datefield] = datetime.datetime.now()
as_dict["msg"] = as_dict.pop("text")
rows.append(as_dict)
self.logger.info("Got logs...")
return rows
def clear_log(self):
self.logger.info("Purging log from modem...")
post_data = {"clear_log": 1, "RetailSessionId": "", "webToken": self.token}
response = self.session.post(
"http://192.168.100.1/goform/EventLog",
data=post_data,
)
response.raise_for_status()
self.logger.info("Purged logs...")
def record(self):
data = self.get_logs()
self.clear_log()
insert_query = """
INSERT INTO homenet.cable_modem
( counts, firsttime, id, index, lasttime, level, msg ) VALUES
( %(counts)s, %(firsttime)s, %(id)s, %(index)s, %(lasttime)s, %(level)s, %(msg)s )
""".strip()
self.logger.info("Recording %s logs in db...", len(data))
pg8000.paramstyle = "pyformat"
with pg8000.connect(
user="jbylund", database="jbylund", host="dionysus", password="magicpass"
) as conn:
cursor = conn.cursor()
for row in data:
cursor.execute(insert_query, row)
conn.commit()
self.logger.info("%s logs recorded in db...", len(data))
def parse_table(self, page_soup, table_id):
html_table = page_soup.find("table", id=table_id)
non_alpha = re.compile("[^a-z]+")
rows = html_table.find_all("tr")
header_row = rows[0]
fields = [
non_alpha.sub("_", icol.get_text().lower())
for icol in header_row.find_all("td")
]
def ident(iarg):
return iarg
process_map = {
"channel": int,
"channel_id": int,
"frequency": lambda s: int(s.strip().split()[0]),
"power": lambda s: float(s.strip().split()[0]),
"snr_mer": lambda s: float(s.strip().split()[0]),
"correctable_codewords": int,
"uncorrectable_codewords": int,
"unerrored_codewords": int,
}
parsed = []
for row in rows[1:]:
row_info = dict(zip(fields, (c.get_text() for c in row.find_all("td"))))
for k, v in row_info.items():
row_info[k] = process_map.get(k, ident)(v)
if row_info.pop("lock_status", "Not Locked") == "Not Locked":
continue
parsed.append(row_info)
return parsed
def get_metrics(self):
attempts = list(range(3))
while attempts:
attempts.pop()
try:
response = self.session.get("http://192.168.100.1/DocsisStatus.asp", timeout=2 * 3)
response.raise_for_status()
break
except:
self.logger.error(traceback.format_exc())
if not attempts:
raise
self.login()
soup = BeautifulSoup(response.text, features="html.parser")
downstream_table = self.parse_table(soup, "dsTable")
upstream_table = self.parse_table(soup, "usTable")
ds_ofdm_table = self.parse_table(soup, "d31dsTable")
us_ofdma_table = self.parse_table(soup, "d31usTable")
ds_hash = {
"channel_type": "bonded",
"direction": "downstream",
"data": downstream_table,
}
us_hash = {
"channel_type": "bonded",
"direction": "upstream",
"data": upstream_table,
}
ds_ofdm_hash = {
"channel_type": "ofdm",
"direction": "downstream",
"data": ds_ofdm_table,
}
us_ofdma_hash = {
"channel_type": "ofdma",
"direction": "upstream",
"data": us_ofdma_table,
}
uptime_node = soup.find("td", id="SystemUpTime").get_text().partition(":")[2]
bits = [int(x) for x in uptime_node.split(":")]
total_uptime = 0
for i in bits:
total_uptime = 60 * total_uptime + i
return {
"down_bonded": ds_hash,
"up_bonded": us_hash,
"down_ofdm": ds_ofdm_hash,
"up_ofdma": us_ofdma_hash,
"uptime": total_uptime,
}
def continulog(self):
while True:
time.sleep(60)
self.record()
def recursive_codewords(indata):
if isinstance(indata, dict):
this_layer = 0
for key, val in indata.items():
if key.endswith("codewords"):
this_layer += val
else:
this_layer += recursive_codewords(val)
return this_layer
elif isinstance(indata, list):
return sum(recursive_codewords(i) for i in indata)
return 0
def main():
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
api = CableModemAPI()
while True:
time.sleep(1)
try:
print(api.get_metrics()["uptime"])
except:
print(traceback.format_exc())
# api.continulog()
return
last = 0
while time.sleep(3) is None:
metrics = api.get_metrics()
total_codewords = recursive_codewords(metrics)
delta = total_codewords - last
last = total_codewords
print("{:,}\t{:,}".format(total_codewords, delta))
if "__main__" == __name__:
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment