Skip to content

Instantly share code, notes, and snippets.

@jceloria
Created May 21, 2018 19:37
Show Gist options
  • Save jceloria/36290e0067f326121528fe15a9ed3ba3 to your computer and use it in GitHub Desktop.
Save jceloria/36290e0067f326121528fe15a9ed3ba3 to your computer and use it in GitHub Desktop.
Telegraf input plugin for Arris TM1602AP2
#!/usr/bin/env python
# -*- coding: utf-8 -*-
########################################################################################################################
"""
Telegraf input plugin for Arris TM1602AP2
Copyright © 2018 by John Celoria.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2, or (at your option)
any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software Foundation,
Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
########################################################################################################################
# Library imports
from __future__ import print_function
from bs4 import BeautifulSoup
import argparse
import json
import re
import sys
try:
# For Python 3.0 and later
from urllib.request import urlopen
except ImportError:
# Fall back to Python 2's urllib2
from urllib2 import urlopen
########################################################################################################################
# Set some defaults
__author__ = 'John Celoria'
__version__ = '0.1'
MODEM_URL = 'http://192.168.100.1/cgi-bin/status_cgi'
########################################################################################################################
def getUpstreamInformation(html, channels):
# Empty list and dictionary
AllChannelInfo = []
CurrentChannelInfo = {}
# Loop through all detected channels
for idx in range(0, channels):
# Scrape information from HTML tables
ChannelID = html.findAll("tr")[idx + 1].findAll("td", recursive=False)[1]
Frequency = html.findAll("tr")[idx + 1].findAll("td", recursive=False)[2]
PowerLevel = html.findAll("tr")[idx + 1].findAll("td", recursive=False)[3]
SymbolRate = html.findAll("tr")[idx + 1].findAll("td", recursive=False)[5]
Modulation = html.findAll("tr")[idx + 1].findAll("td", recursive=False)[6]
# Put information from current channel into dictionary
CurrentChannelInfo = {
"Channel ID": ChannelID.text.encode('ascii', "ignore").strip(),
"Frequency": Frequency.text.encode("ascii", "ignore").strip(),
"Power Level": PowerLevel.text.encode("ascii", "ignore").strip(),
"Symbol Rate": SymbolRate.text.encode("ascii", "ignore").strip(),
"Modulation": Modulation.text.encode("ascii", "ignore").strip()
}
# Add dictionary to list
AllChannelInfo.append(CurrentChannelInfo)
return AllChannelInfo
def getDownstreamInformation(html, channels):
# Empty list and dictionary
AllChannelInfo = []
CurrentChannelInfo = {}
# Loop through all detected channels
for idx in range(0, channels):
# Scrape information from HTML tables
ChannelID = html.findAll("tr")[idx + 1].findAll("td", recursive=False)[1]
Frequency = html.findAll("tr")[idx + 1].findAll("td", recursive=False)[2]
PowerLevel = html.findAll("tr")[idx + 1].findAll("td", recursive=False)[3]
SNR = html.findAll("tr")[idx + 1].findAll("td", recursive=False)[4]
Modulation = html.findAll("tr")[idx + 1].findAll("td", recursive=False)[5]
CorrectableCodewords = html.findAll("tr")[idx + 1].findAll("td", recursive=False)[7]
UncorrectableCodewords = html.findAll("tr")[idx + 1].findAll("td", recursive=False)[8]
# Put information from current channel into dictionary
CurrentChannelInfo = {
"Channel ID": ChannelID.text.encode('ascii', "ignore").strip(),
"Frequency": Frequency.text.encode("ascii", "ignore").strip(),
"Power Level": PowerLevel.text.encode("ascii", "ignore").strip(),
"SNR": SNR.text.encode("ascii", "ignore").strip(),
"Modulation": Modulation.text.encode("ascii", "ignore").strip(),
"Correctable Codewords": CorrectableCodewords.text.encode("ascii", "ignore").strip(),
"Uncorrectable Codewords": UncorrectableCodewords.text.encode("ascii", "ignore").strip()
}
# Add dictionary to list
AllChannelInfo.append(CurrentChannelInfo)
return AllChannelInfo
def normalize(obj):
# Replace spaces w/ underscores, lowercase dictionary keys and strip/convert values to integers
return [{k.replace(" ", "_").lower(): float(re.sub("[^0-9\.-]", "", v)) for k, v in d.items()} for d in obj]
def main(arguments):
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument("-d", "--downstream", help="Return downstream information", action="store_true")
parser.add_argument("-u", "--upstream", help="Return upstream information", action="store_true")
parser.add_argument("-r", "--raw", help="Display unmodified raw data", action="store_true")
args = parser.parse_args(arguments)
if not (args.downstream or args.upstream):
parser.error('Please specify one of -d or -u options')
# Request status_cgi and initialize BeautifulSoup
resp = urlopen(MODEM_URL)
body = resp.read()
soup = BeautifulSoup(body, 'html.parser')
# Parse HTML tables
ustream = soup.findAll("table")[3]
dstream = soup.findAll("table")[1]
# Get the number of channels in use by counting rows
uschan = len(ustream.findAll("tr")) - 1
dschan = len(dstream.findAll("tr")) - 1
if args.upstream:
upstream_list = getUpstreamInformation(ustream, uschan)
if not (args.raw):
upstream_list = normalize(upstream_list)
# Add telegraf tag
[d.update({'stream': 'upstream'}) for d in upstream_list]
# JSON-ify our lists
print(json.dumps(upstream_list, indent=2, sort_keys=True))
if args.downstream:
downstream_list = getDownstreamInformation(dstream, dschan)
if not (args.raw):
downstream_list = normalize(downstream_list)
# Add telegraf tag
[d.update({'stream': 'downstream'}) for d in downstream_list]
# JSON-ify our lists
print(json.dumps(downstream_list, indent=2, sort_keys=True))
if __name__ == '__main__':
sys.exit(main(sys.argv[1:]))
########################################################################################################################
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment