Last active
November 13, 2016 19:31
-
-
Save sergzin/363b2e2c01fd9ce29cd01794d452496c to your computer and use it in GitHub Desktop.
Device discovery component for ziggo wifi routers.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Device discovery component for ziggo wifi routers. | |
Tested on Cisco EPC3928AD EuroDocsis 3.0 2-PORT Voice Gateway | |
Modem software version: e3928A-E10-5-c3220r5592-150612c-ZIG | |
Module parses output from login screen http://192.168.178.1/login_zig.asp | |
place this file in custom_components/device_tracker/ziggo_router.py | |
add in your configuration the following. | |
device_tracker: | |
- platform: ziggo_router | |
host: 192.168.178.1 | |
username: ziggo | |
password: your_ziggo_password | |
""" | |
import logging | |
import requests | |
import voluptuous as vol | |
from datetime import timedelta, datetime | |
from collections import namedtuple | |
import homeassistant.helpers.config_validation as cv | |
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME | |
from homeassistant.components.device_tracker import DOMAIN, PLATFORM_SCHEMA | |
from homeassistant.util import Throttle | |
REQUIREMENTS = ['beautifulsoup4==4.4.1'] | |
_LOGGER = logging.getLogger(__name__) | |
MIN_TIME_BETWEEN_SCANS = timedelta(seconds=60) | |
PLATFORM_SCHEMA.extend({ | |
vol.Required(CONF_HOST): cv.string, | |
vol.Required(CONF_PASSWORD): cv.string, | |
vol.Required(CONF_USERNAME): cv.string | |
}) | |
Device = namedtuple('Device', ['mac', 'name', 'ip', 'last_update']) | |
def get_scanner(hass, config): | |
"""Validate the configuration and return an instance of scanner.""" | |
try: | |
return ZiggoDeviceScanner(config[DOMAIN]) | |
except ConnectionError: | |
return None | |
class ZiggoDeviceScanner(object): | |
"""Fetch connected devices from ziggo wifi router""" | |
def __init__(self, config): | |
"""Initialize the scanner.""" | |
self.host = config[CONF_HOST] | |
self.params = { | |
'Zigloginnaam': config[CONF_USERNAME], | |
'Zigpassword': config[CONF_PASSWORD] | |
} | |
self.last_results = [] | |
self.mac2name = {} | |
self.login_url = 'http://{host}/goform/login_zig'.format(host=self.host) | |
data = self._get_ziggo_clients() | |
if not data: | |
raise ConnectionError('Cannot connect to Ziggo router') | |
@Throttle(MIN_TIME_BETWEEN_SCANS) | |
def _update_info(self): | |
""" | |
Check for clients on wifi router. | |
:return: True if scan successful | |
:rtype: `bool` | |
""" | |
_LOGGER.info("Getting clients...") | |
result = self._get_ziggo_clients() | |
if result is None: | |
_LOGGER.info("No data received") | |
return False | |
last_results = [] | |
for device in result: | |
mac = device['mac_address'].upper() | |
last_results.append( | |
Device(mac, device['hostname'], device['ipaddress'], device['active_since']) | |
) | |
self.mac2name[mac] = device['hostname'] | |
self.last_results = last_results | |
_LOGGER.info("Scanning successful. Got {} clients.".format(len(self.last_results))) | |
return True | |
def scan_devices(self): | |
"""Scan for new devices and return a list with found device IDs.""" | |
self._update_info() | |
return [device.mac for device in self.last_results] | |
def get_device_name(self, mac): | |
""" | |
Return a name of device or None | |
:param mac: Mac address | |
:return: name of device | |
:rtype: str or None | |
""" | |
return self.mac2name.get(str.upper(mac), None) | |
def _get_ziggo_clients(self): | |
""" | |
Fetch data from wifi router. | |
:return: parsed out | |
:rtype: collection.iterable[dict] or None | |
""" | |
try: | |
with requests.Session() as session: | |
response = session.post(self.login_url, data=self.params) | |
return self._parse_table_gebruikers(response.content) | |
except requests.exceptions.Timeout: | |
_LOGGER.exception('Connection to the router timed out') | |
return None | |
except MissingHTMLTable: | |
_LOGGER.exception('Login page missing users table. check credentials.') | |
return None | |
@classmethod | |
def _parse_table_gebruikers(cls, html_input): | |
""" | |
Parse html table and return a list of connected clients. | |
:param html_input: HTML content | |
:type html_input: str | |
:return: List of clients | |
:rtype: list[dict] | |
""" | |
from bs4 import BeautifulSoup | |
soup = BeautifulSoup(html_input, 'html.parser') | |
table = soup.find('table', {"class": "gebruikers"}) | |
if not table: | |
raise MissingHTMLTable() | |
result = [] | |
for row in table.findAll("tr", {"class": "blue"}): | |
cols = row.findAll("td") | |
active_since = cols[4].text.strip().replace(' ', '') | |
# datetime.strptime('13/11/2016 10: 8:35', '%d/%m/%Y%H:%M:%S') | |
result.append({ | |
"hostname": cols[0].text, | |
"mac_address": cols[1].text, | |
"ipaddress": cols[2].text, | |
# "port": cols[3].text, | |
"active_since": datetime.strptime(active_since, '%d/%m/%Y%H:%M:%S')} | |
) | |
return result | |
class MissingHTMLTable(Exception): | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment