Skip to content

Instantly share code, notes, and snippets.

@sergzin
Last active November 13, 2016 19:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sergzin/363b2e2c01fd9ce29cd01794d452496c to your computer and use it in GitHub Desktop.
Save sergzin/363b2e2c01fd9ce29cd01794d452496c to your computer and use it in GitHub Desktop.
Device discovery component for ziggo wifi routers.
"""
Device discovery component for ziggo wifi routers.
Tested on Cisco EPC3928AD EuroDocsis 3.0 2-PORT Voice Gateway
Modem software version: e3928A-E10-5-c3220r5592-150612c-ZIG
Module parses output from login screen http://192.168.178.1/login_zig.asp
place this file in custom_components/device_tracker/ziggo_router.py
add in your configuration the following.
device_tracker:
- platform: ziggo_router
host: 192.168.178.1
username: ziggo
password: your_ziggo_password
"""
import logging
import requests
import voluptuous as vol
from datetime import timedelta, datetime
from collections import namedtuple
import homeassistant.helpers.config_validation as cv
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME
from homeassistant.components.device_tracker import DOMAIN, PLATFORM_SCHEMA
from homeassistant.util import Throttle
REQUIREMENTS = ['beautifulsoup4==4.4.1']
_LOGGER = logging.getLogger(__name__)
MIN_TIME_BETWEEN_SCANS = timedelta(seconds=60)
PLATFORM_SCHEMA.extend({
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_PASSWORD): cv.string,
vol.Required(CONF_USERNAME): cv.string
})
Device = namedtuple('Device', ['mac', 'name', 'ip', 'last_update'])
def get_scanner(hass, config):
"""Validate the configuration and return an instance of scanner."""
try:
return ZiggoDeviceScanner(config[DOMAIN])
except ConnectionError:
return None
class ZiggoDeviceScanner(object):
"""Fetch connected devices from ziggo wifi router"""
def __init__(self, config):
"""Initialize the scanner."""
self.host = config[CONF_HOST]
self.params = {
'Zigloginnaam': config[CONF_USERNAME],
'Zigpassword': config[CONF_PASSWORD]
}
self.last_results = []
self.mac2name = {}
self.login_url = 'http://{host}/goform/login_zig'.format(host=self.host)
data = self._get_ziggo_clients()
if not data:
raise ConnectionError('Cannot connect to Ziggo router')
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
"""
Check for clients on wifi router.
:return: True if scan successful
:rtype: `bool`
"""
_LOGGER.info("Getting clients...")
result = self._get_ziggo_clients()
if result is None:
_LOGGER.info("No data received")
return False
last_results = []
for device in result:
mac = device['mac_address'].upper()
last_results.append(
Device(mac, device['hostname'], device['ipaddress'], device['active_since'])
)
self.mac2name[mac] = device['hostname']
self.last_results = last_results
_LOGGER.info("Scanning successful. Got {} clients.".format(len(self.last_results)))
return True
def scan_devices(self):
"""Scan for new devices and return a list with found device IDs."""
self._update_info()
return [device.mac for device in self.last_results]
def get_device_name(self, mac):
"""
Return a name of device or None
:param mac: Mac address
:return: name of device
:rtype: str or None
"""
return self.mac2name.get(str.upper(mac), None)
def _get_ziggo_clients(self):
"""
Fetch data from wifi router.
:return: parsed out
:rtype: collection.iterable[dict] or None
"""
try:
with requests.Session() as session:
response = session.post(self.login_url, data=self.params)
return self._parse_table_gebruikers(response.content)
except requests.exceptions.Timeout:
_LOGGER.exception('Connection to the router timed out')
return None
except MissingHTMLTable:
_LOGGER.exception('Login page missing users table. check credentials.')
return None
@classmethod
def _parse_table_gebruikers(cls, html_input):
"""
Parse html table and return a list of connected clients.
:param html_input: HTML content
:type html_input: str
:return: List of clients
:rtype: list[dict]
"""
from bs4 import BeautifulSoup
soup = BeautifulSoup(html_input, 'html.parser')
table = soup.find('table', {"class": "gebruikers"})
if not table:
raise MissingHTMLTable()
result = []
for row in table.findAll("tr", {"class": "blue"}):
cols = row.findAll("td")
active_since = cols[4].text.strip().replace(' ', '')
# datetime.strptime('13/11/2016 10: 8:35', '%d/%m/%Y%H:%M:%S')
result.append({
"hostname": cols[0].text,
"mac_address": cols[1].text,
"ipaddress": cols[2].text,
# "port": cols[3].text,
"active_since": datetime.strptime(active_since, '%d/%m/%Y%H:%M:%S')}
)
return result
class MissingHTMLTable(Exception):
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment