Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Network device scanner with DNS, UniFi query and switch port/PoE query
#!/usr/bin/python3
##
## Network device scanner with HTML output, DNS resolution, UniFi info and
## PoE/port info.
##
## Required commands: arping, snmpwalk, host
## Required Python3 libraries: requests, Python Imaging Library
## Required infrastructure and services: UniFi controller, PoE switch with SNMP,
## dnsmasq (DHCP and DNS)
## Also requires ieee-data package installed.
##
## Finds devices on the local network using arping, then does DNS resolution.
## After this, the network switch is queried via SNMP to determine which port
## the device is connected to, and also whether it is using PoE, and, if so, how
## much power. The specified UniFi controller is also queried to determine if
## the device is connected via wireless, and, if so, via what AP and at what
## signal strength.
##
## All of this information is then collated and processed to produce a HTML
## table that is written to a file. It is intended to be embedded in other pages
## using SSI (Server-Side Includes). Re-runs every five minutes.
##
## Tested with:
## - Debian Buster
## - UniFi controller controlling two UniFi AP-AC-LITE
## - Nortel BayStack 5520 48T-PWR
## Network interface to bind to
bindInterface = 'eth0'
## IP address of the bound interface
bindAddress = '192.168.0.1'
## Network mask of the bound interface
bindMask = '255.255.255.0'
## Number of scan threads to run concurrently
scanThreads = 100
## Output file
htmlOutFile = 'networkDevices.html'
## OUI MAC vendor database location
ouiPath = '/usr/share/ieee-data/oui.txt'
## dnsmasq leases file path
leasePath = '/var/lib/misc/dnsmasq.leases'
## DNS server ip
dnsServer = '127.0.0.1'
## UniFi controller
unifiController = '127.0.0.1'
## UniFi controller username/password
unifiUsername = 'user'
unifiPassword = 'password'
## Names for different UniFi AP MACs
unifiNames = {'f0:9f:c2:aa:bb:cc': 'Inside wireless', 'f0:9f:c2:11:22:33': 'Outside wireless'}
## Switch address
switchAddress = 'switch'
## Switch SNMP MAC OID prefix
switch_oid_mac = 'iso.3.6.1.2.1.17.4.3.1.1'
## Switch SNMP port OID prefix
switch_oid_port = 'iso.3.6.1.2.1.17.4.3.1.2'
## Switch SNMP speed OID prefix
switch_oid_speed = 'iso.3.6.1.2.1.2.2.1.5'
## Switch SNMP PoE class OID prefix
switch_oid_poe_state = 'iso.3.6.1.2.1.105.1.1.1.6.1'
## Switch SNMP PoE class OID prefix
switch_oid_poe_class = 'iso.3.6.1.2.1.105.1.1.1.10.1'
## Switch SNMP PoE usage (combined) counters prefix
switch_oid_poe_usage = 'iso.3.6.1.2.1.105.1.3.1.1'
###
from multiprocessing.pool import ThreadPool
import time
import subprocess
import netaddr
import random
import re
import datetime
import socket
import requests
import urllib3
urllib3.disable_warnings()
class wirelessClient():
def __init__(self, mac, radio_proto, rx_rate, tx_rate, signal, ap_mac):
self.mac = mac
self.radio_proto = radio_proto
self.rx_rate = rx_rate
self.tx_rate = tx_rate
self.signal = signal
self.ap_mac = ap_mac
def getPortPower(switchAddress, oid_poe_state, oid_poe_class):
poeState = dict()
poeClass = dict()
output1 = subprocess.check_output(['snmpwalk', '-c', 'public', '-v', '2c', switchAddress, oid_poe_state])
for line in output1.decode('utf-8').split('\n'):
regexp_state = re.search(oid_poe_state + r'\.([\d\.]+)\s=\sINTEGER:\s(\d+)', line)
if regexp_state:
port = regexp_state.group(1)
state = int(regexp_state.group(2))
poeState[str(port)] = state
output2 = subprocess.check_output(['snmpwalk', '-c', 'public', '-v', '2c', switchAddress, oid_poe_class])
for line in output2.decode('utf-8').split('\n'):
regexp_class = re.search(oid_poe_class + r'\.([\d\.]+)\s=\sINTEGER:\s(\d+)', line)
if regexp_class:
port = regexp_class.group(1)
pclass = int(regexp_class.group(2))
## only add class if delivering power. otherwise set None
if poeState[str(port)] == 3:
poeClass[str(port)] = pclass - 1
else:
poeClass[str(port)] = None
return poeClass
def getPortSpeeds(switchAddress, oid_speed):
speedPort = dict()
output = subprocess.check_output(['snmpwalk', '-c', 'public', '-v', '2c', switchAddress, oid_speed])
for line in output.decode('utf-8').split('\n'):
regexp_speed = re.search(oid_speed + r'\.(\d+)\s=\sGauge32:\s(\d+)', line)
if regexp_speed:
port = regexp_speed.group(1)
speed = int(regexp_speed.group(2))
speedPort[str(port)] = speed
return speedPort
def getPortMacs(switchAddress, oid_mac, oid_port):
mapping = dict()
output = subprocess.check_output(['snmpwalk', '-c', 'public', '-v', '2c', switchAddress, '.1.3.6.1.2.1.17.4.3'])
for line in output.decode('utf-8').split('\n'):
regexp_mac = re.search(oid_mac + r'\.([\d\.]+)\s=\sHex-STRING:\s(((([A-F0-9]{2})\s){5})([A-F0-9]{2}))', line)
regexp_port = re.search(oid_port + r'\.([\d\.]+)\s=\sINTEGER:\s(\d+)', line)
if regexp_mac:
id = regexp_mac.group(1)
mac = ':'.join(regexp_mac.group(2).split(' '))
mapping[id] = [mac]
elif regexp_port:
id = regexp_port.group(1)
port = regexp_port.group(2)
mapping[id].append(port)
macPort = []
for id in mapping.keys():
macPort.append([mapping[id][0], mapping[id][1]])
return macPort
def makePowerGraph(switchAddress, oid_poe_usage):
power_max = 0
power_cur = 0
output = subprocess.check_output(['snmpwalk', '-c', 'public', '-v', '2c', switchAddress, oid_poe_usage])
for line in output.decode('utf-8').split('\n'):
regexp_max = re.search(oid_poe_usage + r'\.2.1\s=\sGauge32:\s(\d+)', line)
regexp_cur = re.search(oid_poe_usage + r'\.4.1\s=\sGauge32:\s(\d+)', line)
if regexp_max:
power_max = int(regexp_max.group(1))
elif regexp_cur:
power_cur = int(regexp_cur.group(1))
divider = power_max / 100
percent = int(power_cur / divider)
remain = 100 - percent
htmlOut = '<div id="ndl-poeUsage-title">Switch PoE usage - ' + str(percent) + '%</div><table id="ndl-poeUsage"><tr>'
for i in range(0, percent):
htmlOut += '<td class="ndl-poeUsage-on">&nbsp;</td>'
for i in range(0, remain):
htmlOut += '<td class="ndl-poeUsage-off">&nbsp;</td>'
htmlOut += '</tr></table>'
return htmlOut
def findPortForMac(macPort, smac):
for mac, port in macPort:
if smac.upper() == mac.upper():
return port
return None
def getWirelessClients(unifiController, unifiUsername, unifiPassword):
print('Getting UniFi wireless clients from ' + unifiController)
global wirelessClient
clientList = []
try:
payload = {'username': unifiUsername, 'password': unifiPassword}
login = requests.post('https://' + unifiController + ':8443/api/login', json = payload, verify = False)
cookies = login.cookies
sta = requests.get('https://' + unifiController + ':8443/api/s/default/stat/sta', cookies = cookies, verify = False)
for client in sta.json()['data']:
clientList.append(wirelessClient(client['mac'], client['radio_proto'].upper(), client['rx_rate'], client['tx_rate'], client['signal'], client['ap_mac']))
except:
print('Failed to get wireless clients from UniFi!')
return clientList
def getClientInfo(clientList, mac):
for client in clientList:
if client.mac.upper() == mac.upper():
return client
return None
def getHost(ip):
global dnsServer
hostname = 'Unknown'
try:
for line in str(subprocess.check_output(['host', '-W', '1', '-r', ip, dnsServer])).split('\n'):
regexp_host = re.search(r'domain name pointer ([^\.]+)\.', str(line))
if regexp_host:
hostname = regexp_host.group(1)
break
except:
pass
return hostname
def getLeaseExpiry(ip):
fSock = open(leasePath, 'r')
expiry = 'Static assignment'
for line in fSock:
if line.split(' ')[2] == ip:
try:
leaseExpiry = int(line.split(' ')[0])
expiry = datetime.datetime.fromtimestamp(leaseExpiry).strftime('%a, %b %d %Y %H:%M:%S')
except:
pass
break
fSock.close()
return expiry
def checkHost(ip):
global ipLength, doneCount, foundDevices, bindInterface
doneCount += 1
print('\r Checking ' + str(doneCount) + '/' + str(ipLength) + ' (' + str(ip) + '). Found ' + str(len(foundDevices)) + ' device(s)...', end = '', flush = True)
try:
arpOutput = subprocess.check_output(['arping', '-r', '-c', '3', '-C', '1', '-i', bindInterface, ip])
except:
pass
else:
mac = arpOutput.decode('utf-8').replace('\n', '').upper()[0:17]
vendor = getVendor(mac)
foundDevices.append([ip, mac, vendor])
def getTargetIPs():
targetIPs = []
targetIPs += netaddr.IPNetwork(bindAddress + '/' + bindMask)
cleanList = []
for target in targetIPs:
cleanList.append(str(target))
return cleanList
def getVendor(mac):
global ouiData
mac = str(mac).upper()
searchMAC = re.sub(r'(([0-9A-F]{2}\:?){3})\:(([0-9A-F]{2}\:?){3})', r'\1', mac)
foundVendor = False
for line in ouiData.split('\n'):
regexp_info = re.search('(([0-9A-F]{2}\-){2}([0-9A-F]{2}))\s+\(hex\)\s+(.+)', line)
if regexp_info:
mac = re.sub(r'\-', ':', regexp_info.group(1))
if mac == searchMAC:
foundVendor = True
vendor = regexp_info.group(4)
return(vendor)
return('Unknown')
print('Initialising..')
pool = ThreadPool(processes=scanThreads)
print('Reading OUI database..')
ouiDB = open(ouiPath, encoding = 'utf-8')
ouiData = ouiDB.read()
ouiDB.close()
print('Getting IP list..')
## get the IP list in random order
ipList = getTargetIPs()
random.shuffle(ipList)
ipLength = len(ipList)
## run indefinitely
childThreads = []
while True:
foundDevices = []
doneCount = 0
print('Starting scan threads..')
for ipAddress in ipList:
childThreads.append(pool.apply_async(checkHost, (ipAddress, )))
wifiClients = getWirelessClients(unifiController, unifiUsername, unifiPassword)
portMacs = getPortMacs(switchAddress, switch_oid_mac, switch_oid_port)
portSpeeds = getPortSpeeds(switchAddress, switch_oid_speed)
portPower = getPortPower(switchAddress, switch_oid_poe_state, switch_oid_poe_class)
print('Waiting for scan to complete..')
for ct in childThreads:
try:
ct.get(30)
except:
pass
time.sleep(2)
print('Scan complete')
## Sort the list by IP. Uses socket.inet_aton to convert the IPs to a easily sortable string representation
foundDevices.sort(key=lambda k:(socket.inet_aton(k[0])))
# print('Devices found:')
# for ip, mac, vendor in foundDevices:
# print('IP ' + str(ip) + ' MAC ' + str(mac) + ' VENDOR ' + str(vendor))
## do html output
print('Writing HTML output..')
htmlOut = open(htmlOutFile, mode='w')
htmlOut.write('<div id="networkDevices">')
htmlOut.write('Last updated ' + str(datetime.datetime.fromtimestamp(time.time()).strftime('%a, %b %d %Y at %H:%M:%S')))
htmlOut.write('<table id="networkDeviceList"><tr><th>Hostname</th><th>MAC address</th><th>Speed (TX/RX)</th><th>Connection method</th><th>DHCP lease expires</th></tr>')
for ip, mac, vendor in foundDevices:
wifiInfo = getClientInfo(wifiClients, mac)
htmlOut.write('<tr><td>' + getHost(ip) + '<div class="ndl-smalltext2">' + str(ip) + '</div></td><td>' + str(mac) + '<div class="ndl-smalltext1">' + str(vendor) + '</td>')
if wifiInfo != None:
if wifiInfo.radio_proto == 'NA':
wifiInfo.radio_proto = 'N'
elif wifiInfo.radio_proto == 'NG':
wifiInfo.radio_proto = 'G'
if wifiInfo.signal < -90:
signalQuality = 'Unusable'
elif wifiInfo.signal < -80:
signalQuality = 'Poor'
elif wifiInfo.signal < -70:
signalQuality = 'Average'
elif wifiInfo.signal < -67:
signalQuality = 'Good'
elif wifiInfo.signal < -30:
signalQuality = 'Excellent'
else:
signalQuality = 'Unknown'
try:
apInfo = '<div class="ndl-smalltext2">Connected to ' + unifiNames[wifiInfo.ap_mac] + '</div>'
except:
apInfo = ''
htmlOut.write('<td>' + str(int(wifiInfo.tx_rate/1000)) + 'mbps/' + str(int(wifiInfo.rx_rate/1000)) + 'mbps</td><td>Wireless-' + wifiInfo.radio_proto + '<div class="ndl-smalltext2">' + str(wifiInfo.signal) + 'dBm (' + signalQuality + ')</div>' + apInfo + '</td>')
else:
port = str(findPortForMac(portMacs, mac))
poeText = ''
if port == 'None':
port = 'Unknown'
speed = 'Unknown'
elif port == '0':
port = 'Infrastructure'
speed = 'N/A'
else:
speed = str(int(portSpeeds[port]/1000000)) + 'mbps/' + str(int(portSpeeds[port]/1000000)) + 'mbps'
if portPower[port] != None:
poeText = '<div class="ndl-smalltext2">PoE device - class ' + str(portPower[port]) + '</div>'
port = 'Switch port ' + port
htmlOut.write('<td>' + speed + '</td><td>' + str(port) + poeText + '</td>')
htmlOut.write('<td>' + getLeaseExpiry(ip) + '</td></tr>')
htmlOut.write('</table>')
htmlOut.write(makePowerGraph(switchAddress, switch_oid_poe_usage))
htmlOut.write('</div>')
htmlOut.close()
print('Waiting five minutes before scanning again')
time.sleep(300)
print('Exiting!')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment