Skip to content

Instantly share code, notes, and snippets.

@piotrmaslanka
Created February 15, 2017 22:24
Show Gist options
  • Save piotrmaslanka/3515fd1955ffbe2c6511f46aac498994 to your computer and use it in GitHub Desktop.
Save piotrmaslanka/3515fd1955ffbe2c6511f46aac498994 to your computer and use it in GitHub Desktop.
smok.co: enumerate GSM BTS data + external temperature at a place
from sai.devices import Sensor, Pathpoint, get_all_devices, slTREND
from sai.orders import acADVISE, Section
from sai.basic import ur
def find_applicable_devices():
"""
Finds devices that are applicable to be taken under QoR.
:return: list of Device objects
"""
# Step 1: Find all devices that have csqtrack installed
devices_with_csqtrack = []
for device in get_all_devices():
found_csqtrack = False
for cons in device.get_device_handler_api().get_associated_devices():
if cons.get_device_handler_api().configuration.startswith('csqtrack'):
found_csqtrack = True
break
if found_csqtrack:
devices_with_csqtrack.append(device)
# Step 2: This device must declare 'temp external'
devices_with_external_and_csqtrack = []
for device in devices_with_csqtrack:
try:
ts, v = Sensor(device.device_id, 'temp external').get()
except Sensor.DoesNotExist:
continue
except (Sensor.NotReadedError, Sensor.ReadFailedError):
continue
else:
devices_with_external_and_csqtrack.append(device)
return devices_with_external_and_csqtrack
def force_read_lac_cid(devices):
"""
This declares (if not existent) and forces a read of LAC and CID.
This forces read of external temperature as well.
:param devices: list of Device objects
"""
for device in devices:
device_id = device.device_id
# Do Watlac and Watcid exist?
if not Pathpoint(device_id, 'Watlac').exists():
Pathpoint.define(device_id, 'Watlac', slTREND)
if not Pathpoint(device_id, 'Watcid').exists():
Pathpoint.define(device_id, 'Watcid', slTREND)
# create a read barrage
q = Section()
q.absorb(Pathpoint(device_id, 'Watlac').read(acADVISE))
q.absorb(Pathpoint(device_id, 'Watcid').read(acADVISE))
q.absorb(Sensor(device_id, 'temp external').read(acADVISE))
device.execute_sections([q])
def show_results(devices):
"""Show results in reading shit"""
for device in devices:
device_id = device.device_id
try:
ts, lac = Pathpoint(device_id, 'Watlac').get()
ts, cid = Pathpoint(device_id, 'Watcid').get()
ts, temp = Sensor(device_id, 'temp external').get()
if isinstance(temp, ur.Quantity): temp = temp.magnitude # if unit-aware
except (Sensor.ReadFailedError, Sensor.DoesNotExist, Sensor.NotReadedError):
continue
print '%s:%s => %s C' % (lac, cid, temp)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment