Last active
February 1, 2023 14:52
-
-
Save toxicantidote/6e123d8d113e2f1331171ddb1f55bee5 to your computer and use it in GitHub Desktop.
Network device scanner with DNS, UniFi query and switch port/PoE query
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
## | |
## Network device scanner with HTML output, DNS resolution, UniFi info and | |
## PoE/port info. | |
## | |
## Required commands: arping, snmpwalk, host | |
## Required Python3 libraries: requests, Python Imaging Library | |
## Required infrastructure and services: UniFi controller, PoE switch with SNMP, | |
## dnsmasq (DHCP and DNS) | |
## Also requires ieee-data package installed. | |
## | |
## Finds devices on the local network using arping, then does DNS resolution. | |
## After this, the network switch is queried via SNMP to determine which port | |
## the device is connected to, and also whether it is using PoE, and, if so, how | |
## much power. The specified UniFi controller is also queried to determine if | |
## the device is connected via wireless, and, if so, via what AP and at what | |
## signal strength. | |
## | |
## All of this information is then collated and processed to produce a HTML | |
## table that is written to a file. It is intended to be embedded in other pages | |
## using SSI (Server-Side Includes). Re-runs every five minutes. | |
## | |
## Tested with: | |
## - Debian Buster | |
## - UniFi controller controlling two UniFi AP-AC-LITE | |
## - Nortel BayStack 5520 48T-PWR | |
## Network interface to bind to | |
bindInterface = 'eth0' | |
## IP address of the bound interface | |
bindAddress = '192.168.0.1' | |
## Network mask of the bound interface | |
bindMask = '255.255.255.0' | |
## Number of scan threads to run concurrently | |
scanThreads = 100 | |
## Output file | |
htmlOutFile = 'networkDevices.html' | |
## OUI MAC vendor database location | |
ouiPath = '/usr/share/ieee-data/oui.txt' | |
## dnsmasq leases file path | |
leasePath = '/var/lib/misc/dnsmasq.leases' | |
## DNS server ip | |
dnsServer = '127.0.0.1' | |
## UniFi controller | |
unifiController = '127.0.0.1' | |
## UniFi controller username/password | |
unifiUsername = 'user' | |
unifiPassword = 'password' | |
## Names for different UniFi AP MACs | |
unifiNames = {'f0:9f:c2:aa:bb:cc': 'Inside wireless', 'f0:9f:c2:11:22:33': 'Outside wireless'} | |
## Switch address | |
switchAddress = 'switch' | |
## Switch SNMP MAC OID prefix | |
switch_oid_mac = 'iso.3.6.1.2.1.17.4.3.1.1' | |
## Switch SNMP port OID prefix | |
switch_oid_port = 'iso.3.6.1.2.1.17.4.3.1.2' | |
## Switch SNMP speed OID prefix | |
switch_oid_speed = 'iso.3.6.1.2.1.2.2.1.5' | |
## Switch SNMP PoE class OID prefix | |
switch_oid_poe_state = 'iso.3.6.1.2.1.105.1.1.1.6.1' | |
## Switch SNMP PoE class OID prefix | |
switch_oid_poe_class = 'iso.3.6.1.2.1.105.1.1.1.10.1' | |
## Switch SNMP PoE usage (combined) counters prefix | |
switch_oid_poe_usage = 'iso.3.6.1.2.1.105.1.3.1.1' | |
### | |
from multiprocessing.pool import ThreadPool | |
import time | |
import subprocess | |
import netaddr | |
import random | |
import re | |
import datetime | |
import socket | |
import requests | |
import urllib3 | |
urllib3.disable_warnings() | |
class wirelessClient(): | |
def __init__(self, mac, radio_proto, rx_rate, tx_rate, signal, ap_mac): | |
self.mac = mac | |
self.radio_proto = radio_proto | |
self.rx_rate = rx_rate | |
self.tx_rate = tx_rate | |
self.signal = signal | |
self.ap_mac = ap_mac | |
def getPortPower(switchAddress, oid_poe_state, oid_poe_class): | |
poeState = dict() | |
poeClass = dict() | |
output1 = subprocess.check_output(['snmpwalk', '-c', 'public', '-v', '2c', switchAddress, oid_poe_state]) | |
for line in output1.decode('utf-8').split('\n'): | |
regexp_state = re.search(oid_poe_state + r'\.([\d\.]+)\s=\sINTEGER:\s(\d+)', line) | |
if regexp_state: | |
port = regexp_state.group(1) | |
state = int(regexp_state.group(2)) | |
poeState[str(port)] = state | |
output2 = subprocess.check_output(['snmpwalk', '-c', 'public', '-v', '2c', switchAddress, oid_poe_class]) | |
for line in output2.decode('utf-8').split('\n'): | |
regexp_class = re.search(oid_poe_class + r'\.([\d\.]+)\s=\sINTEGER:\s(\d+)', line) | |
if regexp_class: | |
port = regexp_class.group(1) | |
pclass = int(regexp_class.group(2)) | |
## only add class if delivering power. otherwise set None | |
if poeState[str(port)] == 3: | |
poeClass[str(port)] = pclass - 1 | |
else: | |
poeClass[str(port)] = None | |
return poeClass | |
def getPortSpeeds(switchAddress, oid_speed): | |
speedPort = dict() | |
output = subprocess.check_output(['snmpwalk', '-c', 'public', '-v', '2c', switchAddress, oid_speed]) | |
for line in output.decode('utf-8').split('\n'): | |
regexp_speed = re.search(oid_speed + r'\.(\d+)\s=\sGauge32:\s(\d+)', line) | |
if regexp_speed: | |
port = regexp_speed.group(1) | |
speed = int(regexp_speed.group(2)) | |
speedPort[str(port)] = speed | |
return speedPort | |
def getPortMacs(switchAddress, oid_mac, oid_port): | |
mapping = dict() | |
output = subprocess.check_output(['snmpwalk', '-c', 'public', '-v', '2c', switchAddress, '.1.3.6.1.2.1.17.4.3']) | |
for line in output.decode('utf-8').split('\n'): | |
regexp_mac = re.search(oid_mac + r'\.([\d\.]+)\s=\sHex-STRING:\s(((([A-F0-9]{2})\s){5})([A-F0-9]{2}))', line) | |
regexp_port = re.search(oid_port + r'\.([\d\.]+)\s=\sINTEGER:\s(\d+)', line) | |
if regexp_mac: | |
id = regexp_mac.group(1) | |
mac = ':'.join(regexp_mac.group(2).split(' ')) | |
mapping[id] = [mac] | |
elif regexp_port: | |
id = regexp_port.group(1) | |
port = regexp_port.group(2) | |
mapping[id].append(port) | |
macPort = [] | |
for id in mapping.keys(): | |
macPort.append([mapping[id][0], mapping[id][1]]) | |
return macPort | |
def makePowerGraph(switchAddress, oid_poe_usage): | |
power_max = 0 | |
power_cur = 0 | |
output = subprocess.check_output(['snmpwalk', '-c', 'public', '-v', '2c', switchAddress, oid_poe_usage]) | |
for line in output.decode('utf-8').split('\n'): | |
regexp_max = re.search(oid_poe_usage + r'\.2.1\s=\sGauge32:\s(\d+)', line) | |
regexp_cur = re.search(oid_poe_usage + r'\.4.1\s=\sGauge32:\s(\d+)', line) | |
if regexp_max: | |
power_max = int(regexp_max.group(1)) | |
elif regexp_cur: | |
power_cur = int(regexp_cur.group(1)) | |
divider = power_max / 100 | |
percent = int(power_cur / divider) | |
remain = 100 - percent | |
htmlOut = '<div id="ndl-poeUsage-title">Switch PoE usage - ' + str(percent) + '%</div><table id="ndl-poeUsage"><tr>' | |
for i in range(0, percent): | |
htmlOut += '<td class="ndl-poeUsage-on"> </td>' | |
for i in range(0, remain): | |
htmlOut += '<td class="ndl-poeUsage-off"> </td>' | |
htmlOut += '</tr></table>' | |
return htmlOut | |
def findPortForMac(macPort, smac): | |
for mac, port in macPort: | |
if smac.upper() == mac.upper(): | |
return port | |
return None | |
def getWirelessClients(unifiController, unifiUsername, unifiPassword): | |
print('Getting UniFi wireless clients from ' + unifiController) | |
global wirelessClient | |
clientList = [] | |
try: | |
payload = {'username': unifiUsername, 'password': unifiPassword} | |
login = requests.post('https://' + unifiController + ':8443/api/login', json = payload, verify = False) | |
cookies = login.cookies | |
sta = requests.get('https://' + unifiController + ':8443/api/s/default/stat/sta', cookies = cookies, verify = False) | |
for client in sta.json()['data']: | |
if 'radio_proto' not in client.keys(): continue ## ignore wired clients | |
clientList.append(wirelessClient(client['mac'], client['radio_proto'].upper(), client['rx_rate'], client['tx_rate'], client['signal'], client['ap_mac'])) | |
except: | |
print('Failed to get wireless clients from UniFi!') | |
return clientList | |
def getClientInfo(clientList, mac): | |
for client in clientList: | |
if client.mac.upper() == mac.upper(): | |
return client | |
return None | |
def getHost(ip): | |
global dnsServer | |
hostname = 'Unknown' | |
try: | |
for line in str(subprocess.check_output(['host', '-W', '1', '-r', ip, dnsServer])).split('\n'): | |
regexp_host = re.search(r'domain name pointer ([^\.]+)\.', str(line)) | |
if regexp_host: | |
hostname = regexp_host.group(1) | |
break | |
except: | |
pass | |
return hostname | |
def getLeaseExpiry(ip): | |
fSock = open(leasePath, 'r') | |
expiry = 'Static assignment' | |
for line in fSock: | |
if line.split(' ')[2] == ip: | |
try: | |
leaseExpiry = int(line.split(' ')[0]) | |
expiry = datetime.datetime.fromtimestamp(leaseExpiry).strftime('%a, %b %d %Y %H:%M:%S') | |
except: | |
pass | |
break | |
fSock.close() | |
return expiry | |
def checkHost(ip): | |
global ipLength, doneCount, foundDevices, bindInterface | |
doneCount += 1 | |
print('\r Checking ' + str(doneCount) + '/' + str(ipLength) + ' (' + str(ip) + '). Found ' + str(len(foundDevices)) + ' device(s)...', end = '', flush = True) | |
try: | |
arpOutput = subprocess.check_output(['arping', '-r', '-c', '3', '-C', '1', '-i', bindInterface, ip]) | |
except: | |
pass | |
else: | |
mac = arpOutput.decode('utf-8').replace('\n', '').upper()[0:17] | |
vendor = getVendor(mac) | |
foundDevices.append([ip, mac, vendor]) | |
def getTargetIPs(): | |
targetIPs = [] | |
targetIPs += netaddr.IPNetwork(bindAddress + '/' + bindMask) | |
cleanList = [] | |
for target in targetIPs: | |
cleanList.append(str(target)) | |
return cleanList | |
def getVendor(mac): | |
global ouiData | |
mac = str(mac).upper() | |
searchMAC = re.sub(r'(([0-9A-F]{2}\:?){3})\:(([0-9A-F]{2}\:?){3})', r'\1', mac) | |
foundVendor = False | |
for line in ouiData.split('\n'): | |
regexp_info = re.search('(([0-9A-F]{2}\-){2}([0-9A-F]{2}))\s+\(hex\)\s+(.+)', line) | |
if regexp_info: | |
mac = re.sub(r'\-', ':', regexp_info.group(1)) | |
if mac == searchMAC: | |
foundVendor = True | |
vendor = regexp_info.group(4) | |
return(vendor) | |
return('Unknown') | |
print('Initialising..') | |
pool = ThreadPool(processes=scanThreads) | |
print('Reading OUI database..') | |
ouiDB = open(ouiPath, encoding = 'utf-8') | |
ouiData = ouiDB.read() | |
ouiDB.close() | |
print('Getting IP list..') | |
## get the IP list in random order | |
ipList = getTargetIPs() | |
random.shuffle(ipList) | |
ipLength = len(ipList) | |
## run indefinitely | |
childThreads = [] | |
while True: | |
foundDevices = [] | |
doneCount = 0 | |
print('Starting scan threads..') | |
for ipAddress in ipList: | |
childThreads.append(pool.apply_async(checkHost, (ipAddress, ))) | |
wifiClients = getWirelessClients(unifiController, unifiUsername, unifiPassword) | |
portMacs = getPortMacs(switchAddress, switch_oid_mac, switch_oid_port) | |
portSpeeds = getPortSpeeds(switchAddress, switch_oid_speed) | |
portPower = getPortPower(switchAddress, switch_oid_poe_state, switch_oid_poe_class) | |
print('Waiting for scan to complete..') | |
for ct in childThreads: | |
try: | |
ct.get(30) | |
except: | |
pass | |
time.sleep(2) | |
print('Scan complete') | |
## Sort the list by IP. Uses socket.inet_aton to convert the IPs to a easily sortable string representation | |
foundDevices.sort(key=lambda k:(socket.inet_aton(k[0]))) | |
# print('Devices found:') | |
# for ip, mac, vendor in foundDevices: | |
# print('IP ' + str(ip) + ' MAC ' + str(mac) + ' VENDOR ' + str(vendor)) | |
## do html output | |
print('Writing HTML output..') | |
htmlOut = open(htmlOutFile, mode='w') | |
htmlOut.write('<div id="networkDevices">') | |
htmlOut.write('Last updated ' + str(datetime.datetime.fromtimestamp(time.time()).strftime('%a, %b %d %Y at %H:%M:%S'))) | |
htmlOut.write('<table id="networkDeviceList"><tr><th>Hostname</th><th>MAC address</th><th>Speed (TX/RX)</th><th>Connection method</th><th>DHCP lease expires</th></tr>') | |
for ip, mac, vendor in foundDevices: | |
wifiInfo = getClientInfo(wifiClients, mac) | |
htmlOut.write('<tr><td>' + getHost(ip) + '<div class="ndl-smalltext2">' + str(ip) + '</div></td><td>' + str(mac) + '<div class="ndl-smalltext1">' + str(vendor) + '</td>') | |
if wifiInfo != None: | |
if wifiInfo.radio_proto == 'NA': | |
wifiInfo.radio_proto = 'N' | |
elif wifiInfo.radio_proto == 'NG': | |
wifiInfo.radio_proto = 'G' | |
if wifiInfo.signal < -90: | |
signalQuality = 'Unusable' | |
elif wifiInfo.signal < -80: | |
signalQuality = 'Poor' | |
elif wifiInfo.signal < -70: | |
signalQuality = 'Average' | |
elif wifiInfo.signal < -67: | |
signalQuality = 'Good' | |
elif wifiInfo.signal < -30: | |
signalQuality = 'Excellent' | |
else: | |
signalQuality = 'Unknown' | |
try: | |
apInfo = '<div class="ndl-smalltext2">Connected to ' + unifiNames[wifiInfo.ap_mac] + '</div>' | |
except: | |
apInfo = '' | |
htmlOut.write('<td>' + str(int(wifiInfo.tx_rate/1000)) + 'mbps/' + str(int(wifiInfo.rx_rate/1000)) + 'mbps</td><td>Wireless-' + wifiInfo.radio_proto + '<div class="ndl-smalltext2">' + str(wifiInfo.signal) + 'dBm (' + signalQuality + ')</div>' + apInfo + '</td>') | |
else: | |
port = str(findPortForMac(portMacs, mac)) | |
poeText = '' | |
if port == 'None': | |
port = 'Unknown' | |
speed = 'Unknown' | |
elif port == '0': | |
port = 'Infrastructure' | |
speed = 'N/A' | |
else: | |
speed = str(int(portSpeeds[port]/1000000)) + 'mbps/' + str(int(portSpeeds[port]/1000000)) + 'mbps' | |
if portPower[port] != None: | |
poeText = '<div class="ndl-smalltext2">PoE device - class ' + str(portPower[port]) + '</div>' | |
port = 'Switch port ' + port | |
htmlOut.write('<td>' + speed + '</td><td>' + str(port) + poeText + '</td>') | |
htmlOut.write('<td>' + getLeaseExpiry(ip) + '</td></tr>') | |
htmlOut.write('</table>') | |
htmlOut.write(makePowerGraph(switchAddress, switch_oid_poe_usage)) | |
htmlOut.write('</div>') | |
htmlOut.close() | |
print('Waiting five minutes before scanning again') | |
time.sleep(300) | |
print('Exiting!') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment