Created
February 1, 2021 20:00
-
-
Save clyra/3fa93e8b7746a598ff38c2b7ba135866 to your computer and use it in GitHub Desktop.
Unifi sensor for HA (api changes)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Unifi sensor. Shows the total number os devices connected. Also shows the number of devices per | |
AP and per essid as attributes. | |
with code from https://github.com/frehov/Unifi-Python-API | |
Version 0.1 | |
""" | |
from datetime import timedelta | |
from requests import Session | |
import json | |
import re | |
from typing import Pattern, Dict, Union | |
URL = 'https://192.168.0.xx:8443' | |
USERNAME = 'admin' | |
PASSWORD = 'password' | |
DEFAULT_NAME = 'Unifi' | |
DEFAULT_SITE = 'default' | |
DEFAULT_VERIFYSSL = False | |
UDM = False | |
class UnifiSensorData(object): | |
""" | |
Unifi API for the Unifi Controller. | |
""" | |
_login_data = {} | |
_current_status_code = None | |
def __init__(self, hass, username, password, site, baseurl, verify_ssl, udm): | |
""" | |
Initiates tha api with default settings if none other are set. | |
:param username: username for the controller user | |
:param password: password for the controller user | |
:param site: which site to connect to (Not the name you've given the site, but the url-defined name) | |
:param baseurl: where the controller is located | |
:param verify_ssl: Check if certificate is valid or not, throws warning if set to False | |
""" | |
self._hass = hass | |
self._login_data['username'] = username | |
self._login_data['password'] = password | |
self._site = site | |
self._verify_ssl = verify_ssl | |
self._baseurl = baseurl | |
self._session = Session() | |
self._ap_list = {} | |
self.total = 0 | |
self.attrs = {} | |
if udm == True: | |
self.loginurl = baseurl + '/api/auth/login' | |
self._baseurl = baseurl + '/proxy/network' | |
else: | |
self.loginurl = baseurl + '/api/login' | |
self._baseurl = baseurl | |
def __enter__(self): | |
""" | |
Contextmanager entry handle | |
:return: isntance object of class | |
""" | |
self.login() | |
return self | |
def __exit__(self, *args): | |
""" | |
Contextmanager exit handle | |
:return: None | |
""" | |
self.logout() | |
def login(self): | |
""" | |
Log the user in | |
:return: None | |
""" | |
self._current_status_code = self._session.post(self.loginurl, data=json.dumps(self._login_data), verify=self._verify_ssl).status_code | |
if self._current_status_code == 400: | |
#_LOGGER.error("Failed to log in to api with provided credentials") | |
print("Failed to log in to api with provided credentials") | |
def logout(self): | |
""" | |
Log the user out | |
:return: None | |
""" | |
self._session.get("{}/logout".format(self._baseurl)) | |
self._session.close() | |
def list_clients(self) -> list: | |
""" | |
List all available clients from the api | |
:return: A list of clients on the format of a dict | |
""" | |
r = self._session.get("{}/api/s/{}/stat/sta".format(self._baseurl, self._site, verify=self._verify_ssl), data="json={}") | |
self._current_status_code = r.status_code | |
if self._current_status_code == 401: | |
_LOGGER.error("Unifi: Invalid login, or login has expired") | |
return None | |
data = r.json()['data'] | |
return data | |
def list_devices(self, mac=None) -> list: | |
""" | |
List all available devices from the api | |
:param mac: if defined, return information for this device only | |
:return: A list of devices on the format of a dict | |
""" | |
r = self._session.get("{}/api/s/{}/stat/device/{}".format(self._baseurl, self._site, mac, selfverify=self._verify_ssl), data="json={}") | |
self._current_status_code = r.status_code | |
if self._current_status_code == 401: | |
#_LOGGER.error("Unifi: Invalid login, or login has expired") | |
print("Unifi: Invalid login, or login has expired") | |
return None | |
data = r.json()['data'] | |
return data | |
def update_ap_list(self, newmac): | |
device_info = (self.list_devices(mac=newmac)) | |
self._ap_list[newmac] = "AP_" + device_info[0]['name'] | |
def update(self): | |
self.login() | |
self.total = 0 | |
self.attrs = {} | |
devices_per_essid = {} | |
devices_per_ap = {} | |
devices_per_ap_name = {} | |
device_list = (self.list_clients()) | |
for device in device_list: | |
self.total += 1 | |
if device['essid'] in devices_per_essid.keys(): | |
devices_per_essid[device['essid']] += 1 | |
else: | |
devices_per_essid[device['essid']] = 1 | |
if device['ap_mac'] in devices_per_ap.keys(): | |
devices_per_ap[device['ap_mac']] += 1 | |
else: | |
devices_per_ap[device['ap_mac']] = 1 | |
for ap in devices_per_ap.keys(): | |
if ap in self._ap_list.keys(): | |
devices_per_ap_name[self._ap_list[ap]] = devices_per_ap[ap] | |
else: | |
self.update_ap_list(ap) | |
devices_per_ap_name[self._ap_list[ap]] = devices_per_ap[ap] | |
#update attrs | |
for key in devices_per_essid.keys(): | |
self.attrs[key] = devices_per_essid[key] | |
for key in devices_per_ap_name.keys(): | |
self.attrs[key] = devices_per_ap_name[key] | |
self.logout() | |
if __name__ == '__main__': | |
hass = {} | |
unifi = UnifiSensorData(hass, USERNAME, PASSWORD, DEFAULT_SITE, URL, DEFAULT_VERIFYSSL, UDM) | |
unifi.login() | |
data = unifi.list_clients() | |
print(data) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment