Skip to content

Instantly share code, notes, and snippets.

@zajdee
Created June 19, 2019 18:33
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zajdee/7025a4e9221089208e667c7a2eefbc22 to your computer and use it in GitHub Desktop.
Save zajdee/7025a4e9221089208e667c7a2eefbc22 to your computer and use it in GitHub Desktop.
Liberty Global Compal CH7465LG dumper
#!/usr/bin/env python
import xml.dom.minidom
import xml.etree.ElementTree as ET
import json
import requests
import os
import lxml.etree as etree
def prettyPrintXml(filename):
x = etree.parse(filename)
return etree.tostring(x, pretty_print=True)
def elemdata(xml, elmname):
for child in xml:
if child.tag == elmname:
return child
return None
def elemtext(xml, elmname):
for child in xml:
if child.tag == elmname:
return child.text.strip()
return None
compalCookieJar = requests.cookies.RequestsCookieJar()
def httpPostGetFunc(func):
token = compalCookieJar.get('sessionToken')
if token == None:
token = ''
r = requests.post('http://192.168.0.1/xml/getter.xml', cookies=compalCookieJar, data = 'token=%s&fun=%d' % (token, func), allow_redirects=False)
for cookie in r.cookies:
compalCookieJar.set(cookie.name, cookie.value, domain='192.168.0.1', path='/')
return r.text
def httpCompalGet(url):
token = compalCookieJar.get('sessionToken')
if token == None:
token = ''
r = requests.get('http://192.168.0.1%s' % (url), cookies=compalCookieJar, allow_redirects=False)
for cookie in r.cookies:
compalCookieJar.set(cookie.name, cookie.value, domain='192.168.0.1', path='/')
return r.text
def httpPostSetFunc(func):
token = compalCookieJar.get('sessionToken')
if token == None:
token = ''
r = requests.post('http://192.168.0.1/xml/setter.xml', cookies=compalCookieJar, data = 'token=%s&fun=%d' % (token, func), allow_redirects=False)
for cookie in r.cookies:
compalCookieJar.set(cookie.name, cookie.value, domain='192.168.0.1', path='/')
return r.text
# simple login = open login page, then access to some xmlrpc elements is allowed
def compalLoginSimple():
# first init token
httpCompalGet('/common_page/login.html')
return
# full login (func 15) = open login page, then login with password to gain full access to xmlrpc
def compalLoginFull(password):
# first init token
httpCompalGet('/common_page/login.html')
token = compalCookieJar.get('sessionToken')
if token == None:
token = ''
func = 15 # login = 15
r = requests.post('http://192.168.0.1/xml/setter.xml', cookies=compalCookieJar, data = 'token=%s&fun=%d&Username=NULL&Password=%s' % (token, func, password), allow_redirects=False)
for cookie in r.cookies:
compalCookieJar.set(cookie.name, cookie.value, domain='192.168.0.1', path='/')
return
# logout (func 16)
def compalLogout():
httpPostSetFunc(16) # 16 = logout
return
# reboot (func 133)
def compalReboot():
httpPostSetFunc(133) # 133 = reboot
return
def dict_from_cookiejar(cj):
"""Returns a key/value dictionary from a CookieJar.
:param cj: CookieJar object to extract cookies from.
:rtype: dict
"""
cookie_dict = {}
for cookie in cj:
cookie_dict[cookie.name] = cookie.value
return cookie_dict
# reads data from Compal modem
# uses cached files for now :-)
def getCompalData(func):
# tree = ET.parse('compal.func%d' % (func))
root = ET.fromstring(httpPostGetFunc(func))
# root = tree.getroot()
return root
def setCompalData(func, data):
return
# gets data about the cable network access
# {'HideRemoteAccess': 'False',
# 'GWOperMode': 'Disable',
# 'CmProvisionMode': 'IPv4',
# 'CountryID': '3',
# 'GwProvisionMode': 'Disable',
# 'HideModemMode': 'False',
# 'title': 'Connect Box',
# 'ConfigVenderModel': 'CH7465LG',
# 'SwVersion': 'CH7465LG-NCIP-4.50.18.22-1-NOSH',
# 'DsLite': '0',
# 'AccessDenied': 'NONE',
# 'LockedOut': 'Disable',
# 'Interface': '1',
# 'OperatorId': 'LIBERTYGLOBAL',
# 'AccessLevel': '1',
# 'PortControl': '0'}
def getAccessData():
root = getCompalData(1)
return itemize(root)
# gets details about the cable modem (mode, version, mac, serial, uptime, ...)
# The following is available from Compal function #2
# cm_docsis_mode: DOCSIS 3.0
# cm_hardware_version: 4.01
# cm_mac_addr: 54:67:51:D2:9D:60
# cm_serial_number: DDAP619801B4
# cm_system_uptime: 0day(s)16h:49m:41s
# cm_network_access: Allowed
def getCmData():
root = getCompalData(2)
return itemize(root)
# gets operational status data
# {'Bandmode': '4',
# 'WiFiDefalutKey2g': '7nr4PvAsmset',
# 'SSID5G': 'UPC689521C',
# 'PreSharedKey2gLength': '12',
# 'BssEnable2g': '1',
# 'cm_status': 'OPERATIONAL',
# 'SSID2G': 'UPC689521C',
# 'LanUserCount': '0',
# 'WiFiDefalutKey5g': '7nr4PvAsmset',
# 'PreSharedKey5gLength': '12',
# 'BssEnable5g': '1'}
def getCmStatusData():
root = getCompalData(5)
return itemize(root)
# gets data about frequency plan and locked channel frequency
# {'FrequencyPlan': '2', 'Frequency': '338000000'}
def getCmFrequencyPlanData():
root = getCompalData(6)
return itemize(root)
# gets list of downstream channels and their quality, returns dictionary with array of downstream channels:
# {'ds_num': '20',
# 'downstream': [
# {'IsMpegLocked': '1', 'IsQamLocked': '1', 'RxMER': '38.605', 'pow': '4',
# 'PreRs': '2059785035', 'PostRs': '172', 'snr': '38', 'freq': '458000000',
# 'IsFECLocked': '1', 'chid': '20', 'mod': '256qam'}, ...
# ]
# }
def getDownstreamData():
root = getCompalData(10)
ds = []
result = {}
for item in root:
# number of downstream channels bonded
if item.tag == "ds_num":
result[item.tag] = elemtext(root, item.tag)
if item.tag == "downstream":
ds.append(itemize(item))
result['downstream'] = ds
return result
# returns data of unerrored, correctable and uncorrectable frames for downstream
# uses dsid as indexes to downstream channel list
# {'signal': [
# {'uncorrectable': '0', 'dsid': '20', 'correctable': '172', 'unerrored': '2059807663'},
# {'uncorrectable': '0', 'dsid': '19', 'correctable': '149', 'unerrored': '2059821651'}, ...
# ],
# 'sig_num': '20'}
def getDownstreamErrorData():
root = getCompalData(12)
ds = []
result = {}
for item in root:
# number of downstream channels bonded
if item.tag == "sig_num":
result[item.tag] = elemtext(root, item.tag)
if item.tag == "signal":
ds.append(itemize(item))
result['signal'] = ds
return result
# gets list of upstream channels and their quality, returns dictionary with array of upstream channels:
# {'us_num': '6',
# 'upstream': [
# {'ustype': '3', 't2Timeouts': '0', 'power': '48', 'usid': '17',
# 'messageType': '29', 't4Timeouts': '0', 't1Timeouts': '0', 't3Timeouts': '0',
# 'freq': '30700000', 'srate': '5.120', 'channeltype': 'ATDMA', 'mod': '64qam'}, ...
# ]
# }
def getUpstreamData():
root = getCompalData(11)
us = []
result = {}
for item in root:
# number of downstream channels bonded
if item.tag == "us_num":
result[item.tag] = elemtext(root, item.tag)
if item.tag == "upstream":
us.append(itemize(item))
result['upstream'] = us
return result
# gets data about operational status, chip and tuner temperatures (temps in degF?)
# {'OperState': 'OPERATIONAL', 'Temperature': '80', 'TunnerTemperature': '96'}
def getTemperatureData():
root = getCompalData(136)
return itemize(root)
# returns full service status
# {
# 'NumberOfCpes': '245',
# 'cm_comment': 'Operational',
# 'cm_docsis_mode': 'DOCSIS 3.0',
# 'provisioning_st': 'Online',
# 'cm_network_access': 'Allowed',
# 'FileName': '546751D29D60-10.121.128.1.bin',
# 'provisioning_st_num': '12',
# 'dMaxCpes': '4', -- maximum allowable cpes
# 'ds_num': '32', -- maximum bondable down channels
# 'us_num': '6', -- maximum bondable up channels
# 'bpiEnable': '1',
# 'downstream': [
# {'status': '0', 'state': '1', 'primarySettings': '0', 'freq': '0', 'chid': '0', 'mod': 'unknown'},
# ...
# {'status': '0', 'state': '4', 'primarySettings': '0', 'freq': '458000000', 'chid': '20', 'mod': '256qam'},
# ...
# {'status': '0', 'state': '4', 'primarySettings': '1', 'freq': '338000000', 'chid': '5', 'mod': '256qam'}
# ],
# 'upstream': [
# {'freq': '30700000', 'srate': '5.120', 'state': '4', 'power': '108', 'usid': '17'},
# {'freq': '37500000', 'srate': '5.120', 'state': '4', 'power': '109', 'usid': '18'},
# ...
# {'freq': '23200000', 'srate': '5.120', 'state': '4', 'power': '107', 'usid': '13'}
# ],
# 'serviceflow': [
# {'direction': '2', 'pMaxConcatBurst': '42600', 'pMaxTrafficRate': '42000000', 'pMaxTrafficBurst': '42600', 'Sfid': '453887', 'pSchedulingType': '2', 'pMinReservedRate': '0'},
# {'direction': '2', 'pMaxConcatBurst': '8192', 'pMaxTrafficRate': '160000', 'pMaxTrafficBurst': '9600', 'Sfid': '453895', 'pSchedulingType': '2', 'pMinReservedRate': '16000'},
# {'direction': '2', 'pMaxConcatBurst': '8192', 'pMaxTrafficRate': '96000', 'pMaxTrafficBurst': '9600', 'Sfid': '453896', 'pSchedulingType': '2', 'pMinReservedRate': '0'},
# {'direction': '2', 'pMaxConcatBurst': '9600', 'pMaxTrafficRate': '128000', 'pMaxTrafficBurst': '9600', 'Sfid': '453897', 'pSchedulingType': '2', 'pMinReservedRate': '0'},
# {'direction': '1', 'pMaxConcatBurst': '1522', 'pMaxTrafficRate': '420000000', 'pMaxTrafficBurst': '42600', 'Sfid': '453888', 'pSchedulingType': '1', 'pMinReservedRate': '0'},
# {'direction': '1', 'pMaxConcatBurst': '1522', 'pMaxTrafficRate': '160000', 'pMaxTrafficBurst': '3044', 'Sfid': '453898', 'pSchedulingType': '1', 'pMinReservedRate': '0'},
# {'direction': '1', 'pMaxConcatBurst': '1522', 'pMaxTrafficRate': '96000', 'pMaxTrafficBurst': '9600', 'Sfid': '453899', 'pSchedulingType': '1', 'pMinReservedRate': '0'},
# {'direction': '1', 'pMaxConcatBurst': '1522', 'pMaxTrafficRate': '25000000', 'pMaxTrafficBurst': '16320', 'Sfid': '453900', 'pSchedulingType': '1', 'pMinReservedRate': '0'}
# ]
# }
def getServiceStatus():
root = getCompalData(144)
us = []
ds = []
sf = []
result = {}
for item in root:
if item.tag == "upstream":
us.append(itemize(item))
elif item.tag == "downstream":
ds.append(itemize(item))
elif item.tag == "serviceflow":
sf.append(itemize(item))
else:
result[item.tag] = elemtext(root, item.tag)
result['downstream'] = ds
result['upstream'] = us
result['serviceflow'] = sf
return result
# gets device count and port status (unconnected ports not reported)
# {'Device': '1',
# 'ports': [
# {'Eth': '0', 'Speed': '1000'},
#--removed--{}, {}, {}
# ]
# }
def getPortStatus():
root = getCompalData(143)
ports = []
result = {}
for item in root:
# number of downstream channels bonded
if item.tag == "Device":
result[item.tag] = elemtext(root, item.tag)
if item.tag == "port":
portdata = itemize(item)
if len(portdata) > 0:
ports.append(portdata)
result['ports'] = ports
return result
# converts xml structure to json dictionary (without recursion)
def itemize(root):
result = {}
for item in root:
if item.text != None:
result[item.tag] = elemtext(root, item.tag)
return result
def findPortById(ports, portId):
for port in ports:
if (int(port['Eth']) == portId):
return port
return None
######## LOGIN FIRST
#compalLoginSimple()
compalLoginFull('YourCompalPassword')
#f = httpPostGetFunc(2)
# category 001-009 - status
# func 1, not needed to be logged in, but limited set of information
#print getAccessData()
# func 2 - must be logged in
#print getCmData()
# func 3 - web captive portal, not logged in?
# func 5 - must be logged in
#print getCmStatusData()
# func 6 - must be logged in
#print getCmFrequencyPlanData()
# category 010 - 019 - DOCSIS channel status, Eventlog (Logged in for 019 Eventlog)
# func 10
# 15 = login, 16 = logout
FUNCS = []
FUNCS.extend(range(1,14))
FUNCS.extend(range(17,512))
for func in FUNCS:
print(func)
data = httpPostGetFunc(func)
with open('func%d.raw.xml' % (func), 'w') as f:
f.write(data)
try:
pretty = prettyPrintXml('func%d.raw.xml' % (func))
os.unlink('func%d.raw.xml' % (func))
with open('func%d.xml' % (func), 'w') as f:
f.write(pretty)
except:
os.unlink('func%d.raw.xml' % (func))
pass
############# LOGOUT IF NECESSARY
compalLogout()
#!/usr/bin/env python
import xml.dom.minidom
import xml.etree.ElementTree as ET
import json
import requests
def elemdata(xml, elmname):
for child in xml:
if child.tag == elmname:
return child
return None
def elemtext(xml, elmname):
for child in xml:
if child.tag == elmname:
return child.text.strip()
return None
compalCookieJar = requests.cookies.RequestsCookieJar()
def httpPostGetFunc(func):
token = compalCookieJar.get('sessionToken')
if token == None:
token = ''
r = requests.post('http://192.168.0.1/xml/getter.xml', cookies=compalCookieJar, data = 'token=%s&fun=%d' % (token, func), allow_redirects=False)
for cookie in r.cookies:
compalCookieJar.set(cookie.name, cookie.value, domain='192.168.0.1', path='/')
return r.text
def httpCompalGet(url):
token = compalCookieJar.get('sessionToken')
if token == None:
token = ''
r = requests.get('http://192.168.0.1%s' % (url), cookies=compalCookieJar, allow_redirects=False)
for cookie in r.cookies:
compalCookieJar.set(cookie.name, cookie.value, domain='192.168.0.1', path='/')
return r.text
def httpPostSetFunc(func):
token = compalCookieJar.get('sessionToken')
if token == None:
token = ''
r = requests.post('http://192.168.0.1/xml/setter.xml', cookies=compalCookieJar, data = 'token=%s&fun=%d' % (token, func), allow_redirects=False)
for cookie in r.cookies:
compalCookieJar.set(cookie.name, cookie.value, domain='192.168.0.1', path='/')
return r.text
# simple login = open login page, then access to some xmlrpc elements is allowed
def compalLoginSimple():
# first init token
httpCompalGet('/common_page/login.html')
return
# full login (func 15) = open login page, then login with password to gain full access to xmlrpc
def compalLoginFull(password):
# first init token
httpCompalGet('/common_page/login.html')
token = compalCookieJar.get('sessionToken')
if token == None:
token = ''
func = 15 # login = 15
r = requests.post('http://192.168.0.1/xml/setter.xml', cookies=compalCookieJar, data = 'token=%s&fun=%d&Username=NULL&Password=%s' % (token, func, password), allow_redirects=False)
for cookie in r.cookies:
compalCookieJar.set(cookie.name, cookie.value, domain='192.168.0.1', path='/')
return
# logout (func 16)
def compalLogout():
httpPostSetFunc(16) # 16 = logout
return
# reboot (func 133)
def compalReboot():
httpPostSetFunc(133) # 133 = reboot
return
def dict_from_cookiejar(cj):
"""Returns a key/value dictionary from a CookieJar.
:param cj: CookieJar object to extract cookies from.
:rtype: dict
"""
cookie_dict = {}
for cookie in cj:
cookie_dict[cookie.name] = cookie.value
return cookie_dict
# reads data from Compal modem
# uses cached files for now :-)
def getCompalData(func):
# tree = ET.parse('compal.func%d' % (func))
root = ET.fromstring(httpPostGetFunc(func))
# root = tree.getroot()
return root
def setCompalData(func, data):
return
# gets data about the cable network access
# {'HideRemoteAccess': 'False',
# 'GWOperMode': 'Disable',
# 'CmProvisionMode': 'IPv4',
# 'CountryID': '3',
# 'GwProvisionMode': 'Disable',
# 'HideModemMode': 'False',
# 'title': 'Connect Box',
# 'ConfigVenderModel': 'CH7465LG',
# 'SwVersion': 'CH7465LG-NCIP-4.50.18.22-1-NOSH',
# 'DsLite': '0',
# 'AccessDenied': 'NONE',
# 'LockedOut': 'Disable',
# 'Interface': '1',
# 'OperatorId': 'LIBERTYGLOBAL',
# 'AccessLevel': '1',
# 'PortControl': '0'}
def getAccessData():
root = getCompalData(1)
return itemize(root)
# gets details about the cable modem (mode, version, mac, serial, uptime, ...)
# The following is available from Compal function #2
# cm_docsis_mode: DOCSIS 3.0
# cm_hardware_version: 4.01
# cm_mac_addr: 54:67:51:D2:9D:60
# cm_serial_number: DDAP619801B4
# cm_system_uptime: 0day(s)16h:49m:41s
# cm_network_access: Allowed
def getCmData():
root = getCompalData(2)
return itemize(root)
# gets operational status data
# {'Bandmode': '4',
# 'WiFiDefalutKey2g': '7nr4PvAsmset',
# 'SSID5G': 'UPC689521C',
# 'PreSharedKey2gLength': '12',
# 'BssEnable2g': '1',
# 'cm_status': 'OPERATIONAL',
# 'SSID2G': 'UPC689521C',
# 'LanUserCount': '0',
# 'WiFiDefalutKey5g': '7nr4PvAsmset',
# 'PreSharedKey5gLength': '12',
# 'BssEnable5g': '1'}
def getCmStatusData():
root = getCompalData(5)
return itemize(root)
# gets data about frequency plan and locked channel frequency
# {'FrequencyPlan': '2', 'Frequency': '338000000'}
def getCmFrequencyPlanData():
root = getCompalData(6)
return itemize(root)
# gets list of downstream channels and their quality, returns dictionary with array of downstream channels:
# {'ds_num': '20',
# 'downstream': [
# {'IsMpegLocked': '1', 'IsQamLocked': '1', 'RxMER': '38.605', 'pow': '4',
# 'PreRs': '2059785035', 'PostRs': '172', 'snr': '38', 'freq': '458000000',
# 'IsFECLocked': '1', 'chid': '20', 'mod': '256qam'}, ...
# ]
# }
def getDownstreamData():
root = getCompalData(10)
ds = []
result = {}
for item in root:
# number of downstream channels bonded
if item.tag == "ds_num":
result[item.tag] = elemtext(root, item.tag)
if item.tag == "downstream":
ds.append(itemize(item))
result['downstream'] = ds
return result
# returns data of unerrored, correctable and uncorrectable frames for downstream
# uses dsid as indexes to downstream channel list
# {'signal': [
# {'uncorrectable': '0', 'dsid': '20', 'correctable': '172', 'unerrored': '2059807663'},
# {'uncorrectable': '0', 'dsid': '19', 'correctable': '149', 'unerrored': '2059821651'}, ...
# ],
# 'sig_num': '20'}
def getDownstreamErrorData():
root = getCompalData(12)
ds = []
result = {}
for item in root:
# number of downstream channels bonded
if item.tag == "sig_num":
result[item.tag] = elemtext(root, item.tag)
if item.tag == "signal":
ds.append(itemize(item))
result['signal'] = ds
return result
# gets list of upstream channels and their quality, returns dictionary with array of upstream channels:
# {'us_num': '6',
# 'upstream': [
# {'ustype': '3', 't2Timeouts': '0', 'power': '48', 'usid': '17',
# 'messageType': '29', 't4Timeouts': '0', 't1Timeouts': '0', 't3Timeouts': '0',
# 'freq': '30700000', 'srate': '5.120', 'channeltype': 'ATDMA', 'mod': '64qam'}, ...
# ]
# }
def getUpstreamData():
root = getCompalData(11)
us = []
result = {}
for item in root:
# number of downstream channels bonded
if item.tag == "us_num":
result[item.tag] = elemtext(root, item.tag)
if item.tag == "upstream":
us.append(itemize(item))
result['upstream'] = us
return result
# gets data about operational status, chip and tuner temperatures (temps in degF?)
# {'OperState': 'OPERATIONAL', 'Temperature': '80', 'TunnerTemperature': '96'}
def getTemperatureData():
root = getCompalData(136)
return itemize(root)
# returns full service status
# {
# 'NumberOfCpes': '245',
# 'cm_comment': 'Operational',
# 'cm_docsis_mode': 'DOCSIS 3.0',
# 'provisioning_st': 'Online',
# 'cm_network_access': 'Allowed',
# 'FileName': '546751D29D60-10.121.128.1.bin',
# 'provisioning_st_num': '12',
# 'dMaxCpes': '4', -- maximum allowable cpes
# 'ds_num': '32', -- maximum bondable down channels
# 'us_num': '6', -- maximum bondable up channels
# 'bpiEnable': '1',
# 'downstream': [
# {'status': '0', 'state': '1', 'primarySettings': '0', 'freq': '0', 'chid': '0', 'mod': 'unknown'},
# ...
# {'status': '0', 'state': '4', 'primarySettings': '0', 'freq': '458000000', 'chid': '20', 'mod': '256qam'},
# ...
# {'status': '0', 'state': '4', 'primarySettings': '1', 'freq': '338000000', 'chid': '5', 'mod': '256qam'}
# ],
# 'upstream': [
# {'freq': '30700000', 'srate': '5.120', 'state': '4', 'power': '108', 'usid': '17'},
# {'freq': '37500000', 'srate': '5.120', 'state': '4', 'power': '109', 'usid': '18'},
# ...
# {'freq': '23200000', 'srate': '5.120', 'state': '4', 'power': '107', 'usid': '13'}
# ],
# 'serviceflow': [
# {'direction': '2', 'pMaxConcatBurst': '42600', 'pMaxTrafficRate': '42000000', 'pMaxTrafficBurst': '42600', 'Sfid': '453887', 'pSchedulingType': '2', 'pMinReservedRate': '0'},
# {'direction': '2', 'pMaxConcatBurst': '8192', 'pMaxTrafficRate': '160000', 'pMaxTrafficBurst': '9600', 'Sfid': '453895', 'pSchedulingType': '2', 'pMinReservedRate': '16000'},
# {'direction': '2', 'pMaxConcatBurst': '8192', 'pMaxTrafficRate': '96000', 'pMaxTrafficBurst': '9600', 'Sfid': '453896', 'pSchedulingType': '2', 'pMinReservedRate': '0'},
# {'direction': '2', 'pMaxConcatBurst': '9600', 'pMaxTrafficRate': '128000', 'pMaxTrafficBurst': '9600', 'Sfid': '453897', 'pSchedulingType': '2', 'pMinReservedRate': '0'},
# {'direction': '1', 'pMaxConcatBurst': '1522', 'pMaxTrafficRate': '420000000', 'pMaxTrafficBurst': '42600', 'Sfid': '453888', 'pSchedulingType': '1', 'pMinReservedRate': '0'},
# {'direction': '1', 'pMaxConcatBurst': '1522', 'pMaxTrafficRate': '160000', 'pMaxTrafficBurst': '3044', 'Sfid': '453898', 'pSchedulingType': '1', 'pMinReservedRate': '0'},
# {'direction': '1', 'pMaxConcatBurst': '1522', 'pMaxTrafficRate': '96000', 'pMaxTrafficBurst': '9600', 'Sfid': '453899', 'pSchedulingType': '1', 'pMinReservedRate': '0'},
# {'direction': '1', 'pMaxConcatBurst': '1522', 'pMaxTrafficRate': '25000000', 'pMaxTrafficBurst': '16320', 'Sfid': '453900', 'pSchedulingType': '1', 'pMinReservedRate': '0'}
# ]
# }
def getServiceStatus():
root = getCompalData(144)
us = []
ds = []
sf = []
result = {}
for item in root:
if item.tag == "upstream":
us.append(itemize(item))
elif item.tag == "downstream":
ds.append(itemize(item))
elif item.tag == "serviceflow":
sf.append(itemize(item))
else:
result[item.tag] = elemtext(root, item.tag)
result['downstream'] = ds
result['upstream'] = us
result['serviceflow'] = sf
return result
# gets device count and port status (unconnected ports not reported)
# {'Device': '1',
# 'ports': [
# {'Eth': '0', 'Speed': '1000'},
#--removed--{}, {}, {}
# ]
# }
def getPortStatus():
root = getCompalData(143)
ports = []
result = {}
for item in root:
# number of downstream channels bonded
if item.tag == "Device":
result[item.tag] = elemtext(root, item.tag)
if item.tag == "port":
portdata = itemize(item)
if len(portdata) > 0:
ports.append(portdata)
result['ports'] = ports
return result
# converts xml structure to json dictionary (without recursion)
def itemize(root):
result = {}
for item in root:
if item.text != None:
result[item.tag] = elemtext(root, item.tag)
return result
def findPortById(ports, portId):
for port in ports:
if (int(port['Eth']) == portId):
return port
return None
######## LOGIN FIRST
compalLoginSimple()
#compalLoginFull('YourCompalPassword')
#f = httpPostGetFunc(2)
# category 001-009 - status
# func 1, not needed to be logged in, but limited set of information
#print getAccessData()
# func 2 - must be logged in
#print getCmData()
# func 3 - web captive portal, not logged in?
# func 5 - must be logged in
#print getCmStatusData()
# func 6 - must be logged in
#print getCmFrequencyPlanData()
# category 010 - 019 - DOCSIS channel status, Eventlog (Logged in for 019 Eventlog)
# func 10
svc = getServiceStatus()
ports = getPortStatus()
ds = getDownstreamData()
us = getUpstreamData()
temp = getTemperatureData()
### DOWNSTREAM DATA - linked speed (find the fastest speed of provisioned flows - e.g. inet, provisioning, mta and upcwifi flows)
up = 0
down = 0
for flow in svc['serviceflow']:
# print only the fastest speeds
if int(flow['direction']) == 1:
if int(flow['pMaxTrafficRate']) > down:
down = int(flow['pMaxTrafficRate'])
elif int(flow['direction']) == 2:
if int(flow['pMaxTrafficRate']) > up:
up = int(flow['pMaxTrafficRate'])
else:
pass
# up.append(flow['pMaxTrafficRate']) if int(flow['direction']) == 2 else down.append(flow['pMaxTrafficRate'])
### DOWNSTREAM DATA - linked, attainable speeds and snr
print "ds.speed.linked\t%d" % (down)
print "ds.speed.attainable\t%d" % (int(ds['ds_num'])*50000000)
for channel in ds['downstream']:
print "ds.snr\t%d\t%0.2f" % (int(channel['freq']), float(channel['RxMER']))
# func 12
#print getDownstreamErrorData()
# func 11
### UPSTREAM DATA - linked, attainable speeds and snr
print "us.speed.linked\t%d" % (up)
print "us.speed.attainable\t%d" % (int(us['us_num'])*27000000)
for channel in us['upstream']:
print "us.snr\t%d\t%0.2f" % (int(channel['freq']), float(channel['power']))
####### TEMPERATURE AND MAXIMUM NUMBER OF DEVICES CONNECTABLE TO NETWORK
# category 021, 022, 024 - web interface language, login timer, etc. Not needed to be logged in.
# category 100 - 1xx - router settings (incl. parental control) - 123, 128, 136, 143, 144 - available without logging in; for others must be logged in
# interesting items:
# - 136 - temperature
# - 137, 143, 147 - port status (137 with "ethflaplistFile; 147 - only ethflaplistFile - NULL)
# - 144 - service status (complete)
print "temp.cpu\t%d" % (int(temp['Temperature']))
print "temp.tuner\t%d" % (int(temp['TunnerTemperature']))
portIds = [0, 1, 2, 3]
#for port in ports['ports']:
for portId in portIds:
port = findPortById(ports['ports'], portId)
if port != None:
print "port.eth%d\t%d" % (int(port['Eth']), int(port['Speed']))
else:
print "port.eth%d\tU" % portId
print "count.macallowed\t%d" % (int(svc['dMaxCpes']))
# category 300 - 3xx - wireless config, status, keys, etc. Must be logged in.
# category 500 - 5xx - telephony config, status, keys, etc. Must be logged in.
############# LOGOUT IF NECESSARY
#compalLogout()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment