Last active
August 26, 2023 17:34
-
-
Save nickadam/7a89b80235d34074dc4f59afda15c070 to your computer and use it in GitHub Desktop.
Get switch interface mac addresses via snmp (tested with aruba os, aruba cx, aruba wireless controllers, and cisco)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pysnmp.carrier.asyncore.dispatch import AsyncoreDispatcher | |
from pysnmp.carrier.asyncore.dgram import udp | |
from pyasn1.codec.ber import encoder, decoder | |
from pysnmp.proto.api import v2c | |
from pysnmp.proto import api | |
from time import time | |
from collections import OrderedDict | |
import copy | |
import json | |
import re | |
import csv | |
import sys | |
startedAt = time() | |
retData = OrderedDict() | |
def map_mac_to_interface(mac_addresses, vlan_id=0): | |
mac_map = OrderedDict() | |
dd_map = OrderedDict() | |
for oid in mac_addresses: | |
data = mac_addresses[oid] | |
oid_next = data['oid_end'].split('.')[0] | |
mac_dd = '.'.join(data['oid_end'].split('.')[1:]) | |
if oid_next == '1': | |
dd_map[mac_dd] = data['value'].replace('0x', '') | |
if oid_next == '2': | |
if mac_dd not in dd_map: | |
print('Mac dotted decimal interface not in mac table') | |
continue | |
if data['value'] not in mac_map: | |
mac_map[data['value']] = [] | |
mac = dd_map[mac_dd] | |
if vlan_id: | |
mac = mac + '@' + vlan_id | |
mac_map[data['value']].append(mac) | |
return mac_map | |
def get_bulk_oid_f(host, community, oidstr, ignore_timeout=False): | |
global retData | |
global startedAt | |
startedAt = time() | |
retData = OrderedDict() | |
# SNMP table header | |
headVars = [v2c.ObjectIdentifier(oidstr)] | |
# Build PDU | |
reqPDU = v2c.GetBulkRequestPDU() | |
v2c.apiBulkPDU.setDefaults(reqPDU) | |
v2c.apiBulkPDU.setNonRepeaters(reqPDU, 0) | |
v2c.apiBulkPDU.setMaxRepetitions(reqPDU, 25) | |
v2c.apiBulkPDU.setVarBinds(reqPDU, [(x, v2c.null) for x in headVars]) | |
# Build message | |
reqMsg = v2c.Message() | |
v2c.apiMessage.setDefaults(reqMsg) | |
v2c.apiMessage.setCommunity(reqMsg, community) | |
v2c.apiMessage.setPDU(reqMsg, reqPDU) | |
def cbTimerFun(timeNow): | |
if timeNow - startedAt > 5: | |
if not ignore_timeout: | |
raise ValueError("Request timed out") | |
transportDispatcher.jobFinished(1) | |
# noinspection PyUnusedLocal | |
def cbRecvFun( | |
transportDispatcher, | |
transportDomain, | |
transportAddress, | |
wholeMsg, | |
reqPDU=reqPDU, | |
headVars=headVars, | |
): | |
while wholeMsg: | |
rspMsg, wholeMsg = decoder.decode(wholeMsg, asn1Spec=v2c.Message()) | |
rspPDU = v2c.apiMessage.getPDU(rspMsg) | |
# Match response to request | |
if v2c.apiBulkPDU.getRequestID(reqPDU) == v2c.apiBulkPDU.getRequestID(rspPDU): | |
# Format var-binds table | |
varBindTable = v2c.apiBulkPDU.getVarBindTable(reqPDU, rspPDU) | |
# Check for SNMP errors reported | |
errorStatus = v2c.apiBulkPDU.getErrorStatus(rspPDU) | |
if errorStatus and errorStatus != 2: | |
errorIndex = v2c.apiBulkPDU.getErrorIndex(rspPDU) | |
print( | |
"{} at {}".format( | |
errorStatus.prettyPrint(), | |
errorIndex and varBindTable[int(errorIndex) - 1] or "?", | |
) | |
) | |
transportDispatcher.jobFinished(1) | |
break | |
# Report SNMP table | |
stop = False | |
for tableRow in varBindTable: | |
for name, val in tableRow: | |
if oidstr + '.' not in name.prettyPrint(): | |
stop = True | |
break | |
oid_end = name.prettyPrint().replace(oidstr + '.', '') | |
retData[name.prettyPrint()] = {'oid': oidstr, 'oid_end': oid_end, 'value': val.prettyPrint()} | |
# print( | |
# "from: {}, {} = {}".format( | |
# transportAddress, name.prettyPrint(), val.prettyPrint() | |
# ) | |
# ) | |
# Stop on end of missing headvar | |
if stop: | |
transportDispatcher.jobFinished(1) | |
# Stop on EOM | |
for oid, val in varBindTable[-1]: | |
if not isinstance(val, v2c.Null): | |
break | |
else: | |
transportDispatcher.jobFinished(1) | |
# Generate request for next row | |
v2c.apiBulkPDU.setVarBinds( | |
reqPDU, [(x, v2c.null) for x, y in varBindTable[-1]] | |
) | |
v2c.apiBulkPDU.setRequestID(reqPDU, v2c.getNextRequestID()) | |
transportDispatcher.sendMessage( | |
encoder.encode(reqMsg), transportDomain, transportAddress | |
) | |
global startedAt | |
if time() - startedAt > 5: | |
if not ignore_timeout: | |
raise ValueError("Request timed out") | |
transportDispatcher.jobFinished(1) | |
startedAt = time() | |
return retData | |
transportDispatcher = AsyncoreDispatcher() | |
transportDispatcher.registerRecvCbFun(cbRecvFun) | |
transportDispatcher.registerTimerCbFun(cbTimerFun) | |
transportDispatcher.registerTransport( | |
udp.domainName, udp.UdpSocketTransport().openClientMode() | |
) | |
transportDispatcher.sendMessage( | |
encoder.encode(reqMsg), udp.domainName, (host, 161) | |
) | |
transportDispatcher.jobStarted(1) | |
# Dispatcher will finish as job#1 counter reaches zero | |
transportDispatcher.runDispatcher() | |
transportDispatcher.closeDispatcher() | |
def get_bulk_oid(host, community, oidstr, ignore_timeout=False, retry=0): | |
if retry == 3: | |
if oidstr == '1.3.6.1.2.1.1.1': # first system call | |
print(host, 'timeout', file=sys.stderr) | |
sys.exit(1) | |
raise ValueError("Too many retries from get_bulk_oid", host, oidstr) | |
try: | |
return get_bulk_oid_f(host, community, oidstr, ignore_timeout) | |
except ValueError as e: | |
retry = retry + 1 | |
print(host, 'retrying', oidstr, file=sys.stderr) | |
return get_bulk_oid(host, community, oidstr, ignore_timeout, retry) | |
def get_oid_f(host, community, oidstr): | |
global startedAt | |
global retData | |
retData = OrderedDict() | |
startedAt = time() | |
# Protocol version to use | |
pMod = api.protoModules[api.protoVersion1] | |
# pMod = api.protoModules[api.protoVersion2c] | |
# SNMP table header | |
headVars = [pMod.ObjectIdentifier(oidstr)] | |
# Build PDU | |
reqPDU = pMod.GetNextRequestPDU() | |
pMod.apiPDU.setDefaults(reqPDU) | |
pMod.apiPDU.setVarBinds(reqPDU, [(x, pMod.null) for x in headVars]) | |
# Build message | |
reqMsg = pMod.Message() | |
pMod.apiMessage.setDefaults(reqMsg) | |
pMod.apiMessage.setCommunity(reqMsg, community) | |
pMod.apiMessage.setPDU(reqMsg, reqPDU) | |
def cbTimerFun(timeNow): | |
if timeNow - startedAt > 5: | |
raise ValueError("Request timed out") | |
# noinspection PyUnusedLocal | |
def cbRecvFun( | |
transportDispatcher, | |
transportDomain, | |
transportAddress, | |
wholeMsg, | |
reqPDU=reqPDU, | |
headVars=headVars, | |
): | |
while wholeMsg: | |
rspMsg, wholeMsg = decoder.decode(wholeMsg, asn1Spec=pMod.Message()) | |
rspPDU = pMod.apiMessage.getPDU(rspMsg) | |
# Match response to request | |
if pMod.apiPDU.getRequestID(reqPDU) == pMod.apiPDU.getRequestID(rspPDU): | |
# Check for SNMP errors reported | |
errorStatus = pMod.apiPDU.getErrorStatus(rspPDU) | |
if errorStatus and errorStatus != 2: | |
raise ValueError(errorStatus) | |
# Format var-binds table | |
varBindTable = pMod.apiPDU.getVarBindTable(reqPDU, rspPDU) | |
stop = False | |
for tableRow in varBindTable: | |
for name, val in tableRow: | |
if oidstr + '.' not in name.prettyPrint(): | |
stop = True | |
break | |
oid_end = name.prettyPrint().replace(oidstr + '.', '') | |
retData[name.prettyPrint()] = {'oid': oidstr, 'oid_end': oid_end, 'value': val.prettyPrint()} | |
# Stop on end of missing headvar | |
if stop: | |
transportDispatcher.jobFinished(1) | |
# Stop on EOM | |
for oid, val in varBindTable[-1]: | |
if not isinstance(val, pMod.Null): | |
break | |
else: | |
transportDispatcher.jobFinished(1) | |
# Generate request for next row | |
pMod.apiPDU.setVarBinds( | |
reqPDU, [(x, pMod.null) for x, y in varBindTable[-1]] | |
) | |
pMod.apiPDU.setRequestID(reqPDU, pMod.getNextRequestID()) | |
transportDispatcher.sendMessage( | |
encoder.encode(reqMsg), transportDomain, transportAddress | |
) | |
global startedAt | |
if time() - startedAt > 5: | |
raise ValueError("Request timed out") | |
startedAt = time() | |
return retData | |
transportDispatcher = AsyncoreDispatcher() | |
transportDispatcher.registerRecvCbFun(cbRecvFun) | |
transportDispatcher.registerTimerCbFun(cbTimerFun) | |
transportDispatcher.registerTransport( | |
udp.domainName, udp.UdpSocketTransport().openClientMode() | |
) | |
transportDispatcher.sendMessage( | |
encoder.encode(reqMsg), udp.domainName, (host, 161) | |
) | |
transportDispatcher.jobStarted(1) | |
transportDispatcher.runDispatcher() | |
transportDispatcher.closeDispatcher() | |
def get_oid(host, community, oidstr, retry=0): | |
if retry == 3: | |
raise ValueError("Too many retries from get_bulk_oid", host, oidstr) | |
try: | |
return get_oid_f(host, community, oidstr) | |
except ValueError as e: | |
retry = retry + 1 | |
print(host, 'retrying', oidstr, file=sys.stderr) | |
return get_oid(host, community, oidstr, retry) | |
def get_tag_interfaces(hexcode): | |
hexcode = hexcode.replace('0x', '') | |
int_list = [] | |
scale = 16 | |
ss = bin(int(hexcode, scale))[2:] | |
p=1 | |
for s in ss: | |
if int(s) == 1: | |
int_list.append(str(p)) | |
p+=1 | |
return int_list | |
def get_cisco_vlan_macs(host, community, vlan_names): | |
# https://www.cisco.com/c/en/us/support/docs/ip/simple-network-management-protocol-snmp/44800-mactoport44800.html | |
# https://www.cisco.com/c/en/us/support/docs/ip/simple-network-management-protocol-snmp/40700-snmp-ifIndex40700.html | |
global retData | |
vlans = [] | |
for vlan_id in vlan_names: | |
vlan_name = vlan_names[vlan_id] | |
if re.search('-default', vlan_name, re.I): | |
continue | |
vlans.append(vlan_id) | |
vlan_interfaces = OrderedDict() | |
# {'interface_id': {'vlan_id': ['mac']}} | |
for vlan_id in vlans: | |
# get vlans | |
get_bulk_oid(host, community + '@' + vlan_id, '1.3.6.1.2.1.17.4.3.1') # mac address | |
mac_addresses = copy.deepcopy(retData) | |
if(not len(mac_addresses)): | |
continue | |
# get base ports | |
get_bulk_oid(host, community + '@' + vlan_id, '1.3.6.1.2.1.17.1.4.1.2') # base ports | |
base_port_map = copy.deepcopy(retData) | |
base_ports = {} | |
for oid in base_port_map: | |
data = base_port_map[oid] | |
base_ports[data['oid_end']] = data['value'] | |
# map it all together | |
mac_map = map_mac_to_interface(mac_addresses, vlan_id) | |
for port_id in mac_map: | |
macs = mac_map[port_id] | |
interface_id = port_id | |
if port_id in base_ports: | |
interface_id = base_ports[port_id] | |
if interface_id not in vlan_interfaces: | |
vlan_interfaces[interface_id] = OrderedDict() | |
vlan_interfaces[interface_id][vlan_id] = macs | |
return vlan_interfaces | |
def get_switch_interface_info(host, community): | |
global retData | |
# get_bulk_oid(host, community, '1.3.6.1.2.1.47.1.1.1.1') # components | |
# get system info | |
get_bulk_oid(host, community, '1.3.6.1.2.1.1.1') # system | |
check_sys_info = copy.deepcopy(retData) | |
sys_info = {'value': ''} | |
sys_info_value = '' | |
for oid in check_sys_info: | |
sys_info = check_sys_info[oid] | |
sys_info_value = sys_info['value'] | |
if re.search('^0x', sys_info_value): | |
sys_info_value = bytes.fromhex(sys_info_value.replace('0x', '')).decode('utf-8') | |
if re.search('cisco', sys_info_value, re.I): | |
return get_cisco_info(host, community) | |
else: | |
return get_aruba_info(host, community) | |
def get_aruba_controller_info(host, community, vlans): | |
# get mac addresses | |
get_bulk_oid(host, community, '1.3.6.1.4.1.14823.2.2.1.1.2.1.1.2') # mac address | |
mac_addresses = copy.deepcopy(retData) | |
# get usernames | |
get_bulk_oid(host, community, '1.3.6.1.4.1.14823.2.2.1.1.2.1.1.3') # username | |
usernames = copy.deepcopy(retData) | |
# get roles | |
get_bulk_oid(host, community, '1.3.6.1.4.1.14823.2.2.1.1.2.1.1.4') # role | |
roles = copy.deepcopy(retData) | |
# get uptime | |
get_bulk_oid(host, community, '1.3.6.1.4.1.14823.2.2.1.1.2.1.1.5') # uptime | |
uptime = copy.deepcopy(retData) | |
# get auth method | |
get_bulk_oid(host, community, '1.3.6.1.4.1.14823.2.2.1.1.2.1.1.6') # auth method | |
auth_methods = copy.deepcopy(retData) | |
# get access point | |
get_bulk_oid(host, community, '1.3.6.1.4.1.14823.2.2.1.1.2.1.1.7') # access point | |
access_points = copy.deepcopy(retData) | |
# get bandwith contract | |
get_bulk_oid(host, community, '1.3.6.1.4.1.14823.2.2.1.1.2.1.1.12') # bandwith | |
bandwith_contracts = copy.deepcopy(retData) | |
clients = OrderedDict() | |
for oid in mac_addresses: | |
data = mac_addresses[oid] | |
client_id = data['oid_end'] | |
clients[client_id] = OrderedDict() | |
clients[client_id]['mac'] = data['value'].replace('0x', '') | |
clients[client_id]['ip'] = client_id | |
clients[client_id]['auth'] = '' | |
clients[client_id]['username'] = '' | |
clients[client_id]['vlan'] = '' | |
clients[client_id]['ap'] = '' | |
clients[client_id]['role'] = '' | |
clients[client_id]['bandwith'] = '' | |
clients[client_id]['uptime'] = '' | |
# set auth | |
for oid in auth_methods: | |
data = auth_methods[oid] | |
client_id = data['oid_end'] | |
value = data['value'] | |
if value == '1': | |
value = 'none' | |
if value == '2': | |
value = 'other' | |
if value == '3': | |
value = 'web' | |
if value == '4': | |
value = 'dot1x' | |
if value == '5': | |
value = 'vpn' | |
if value == '6': | |
value = 'mac' | |
if client_id in clients: | |
clients[client_id]['auth'] = value | |
else: | |
print('Missing client_id for auth method', data) | |
# set username | |
for oid in usernames: | |
data = usernames[oid] | |
client_id = data['oid_end'] | |
value = data['value'] | |
if client_id in clients: | |
clients[client_id]['username'] = value | |
else: | |
print('Missing client_id for username', data) | |
# set vlan | |
for oid in vlans: | |
data = vlans[oid] | |
client_id = data['oid_end'] | |
value = data['value'] | |
if client_id in clients: | |
clients[client_id]['vlan'] = value | |
else: | |
print('Missing client_id for vlan', data) | |
# set ap | |
for oid in access_points: | |
data = access_points[oid] | |
client_id = data['oid_end'] | |
value = data['value'] | |
if client_id in clients: | |
clients[client_id]['ap'] = value | |
else: | |
print('Missing client_id for ap', data) | |
# set role | |
for oid in roles: | |
data = roles[oid] | |
client_id = data['oid_end'] | |
value = data['value'] | |
if client_id in clients: | |
clients[client_id]['role'] = value | |
else: | |
print('Missing client_id for role', data) | |
# set bandwith | |
for oid in bandwith_contracts: | |
data = bandwith_contracts[oid] | |
client_id = data['oid_end'] | |
value = data['value'] | |
if client_id in clients: | |
clients[client_id]['bandwith'] = value | |
else: | |
print('Missing client_id for bandwith', data) | |
# set uptime | |
for oid in uptime: | |
data = uptime[oid] | |
client_id = data['oid_end'] | |
value = data['value'] | |
if client_id in clients: | |
clients[client_id]['uptime'] = value | |
else: | |
print('Missing client_id for uptime', data) | |
fieldnames = [ | |
'mac', | |
'ip', | |
'auth', | |
'username', | |
'vlan', | |
'ap', | |
'role', | |
'bandwith', | |
'uptime', | |
] | |
with open(sys.argv[1] + '_wifi.csv', 'w') as out_handle: | |
writer = csv.DictWriter(out_handle, fieldnames=fieldnames) | |
writer.writeheader() | |
for client_id in clients: | |
interface = clients[client_id] | |
for name in fieldnames: | |
if name not in interface: | |
interface[name] = '' | |
if type(interface[name]) != str: | |
interface[name] = json.dumps(interface[name]) | |
writer.writerow(interface) | |
def get_aruba_info(host, community): | |
global retData | |
# get vlans | |
vlan_oid = '1.3.6.1.2.1.17.7.1.4.3.1.1' | |
get_bulk_oid(host, community, vlan_oid) # vlans | |
vlans = copy.deepcopy(retData) | |
arubacx = False | |
if len(vlans) == 0: # try aruba controller | |
get_bulk_oid(host, community, '1.3.6.1.4.1.14823.2.2.1.1.2.1.1.9', True) | |
vlans = copy.deepcopy(retData) | |
if(len(vlans) != 0): | |
get_aruba_controller_info(host, community, vlans) | |
sys.exit() | |
if len(vlans) == 0: # try aruba cx | |
arubacx = True | |
get_bulk_oid(host, community, '1.3.111.2.802.1.1.4.1.4.3', True) | |
vlans = copy.deepcopy(retData) | |
vlan_names = {} | |
for oid in vlans: | |
data = vlans[oid] | |
vlan_names[data['oid_end'].split('.')[-1]] = data['value'] | |
# get interface name | |
get_bulk_oid(host, community, '1.3.6.1.2.1.2.2.1.2') # interface name | |
interface_names = copy.deepcopy(retData) | |
# get interface types | |
get_bulk_oid(host, community, '1.3.6.1.2.1.2.2.1.3') # interface type | |
interface_types = copy.deepcopy(retData) | |
# get mac addresses | |
get_bulk_oid(host, community, '1.3.6.1.2.1.17.4.3.1') # mac address | |
mac_addresses = copy.deepcopy(retData) | |
# get last interface change | |
get_bulk_oid(host, community, '1.3.6.1.2.1.2.2.1.9') # last change | |
interface_changes = copy.deepcopy(retData) | |
# get interface admin state | |
get_bulk_oid(host, community, '1.3.6.1.2.1.2.2.1.7') # admin state | |
admin_state = copy.deepcopy(retData) | |
# get interface pvid | |
pvid_oid = '1.3.6.1.2.1.17.7.1.4.5.1.1' | |
if arubacx: | |
pvid_oid = '1.3.111.2.802.1.1.4.1.4.5.1.1' | |
get_bulk_oid(host, community, pvid_oid) # pvid | |
pvid = copy.deepcopy(retData) | |
# get sub interfaces | |
ifstack = OrderedDict() | |
if not arubacx: | |
get_bulk_oid(host, community, '1.3.6.1.2.1.31.1.2.1.3') # ifstack | |
ifstack = copy.deepcopy(retData) | |
# create interfaces object and default values | |
interfaces = OrderedDict() | |
for oid in interface_names: | |
data = interface_names[oid] | |
interfaces[data['oid_end']] = OrderedDict() | |
interfaces[data['oid_end']]['name'] = data['value'] | |
interfaces[data['oid_end']]['type'] = '' | |
interfaces[data['oid_end']]['admin_state'] = '' | |
interfaces[data['oid_end']]['last_change'] = '' | |
interfaces[data['oid_end']]['pvid'] = '' | |
interfaces[data['oid_end']]['ifstack'] = [] | |
interfaces[data['oid_end']]['vlans'] = [] | |
interfaces[data['oid_end']]['mac_count'] = 0 | |
interfaces[data['oid_end']]['macs'] = [] | |
# set interface type | |
for oid in interface_types: | |
data = interface_types[oid] | |
interface_id = data['oid_end'] | |
if interface_id in interfaces: | |
interfaces[interface_id]['type'] = data['value'] | |
else: | |
print('Missing interface name for interface type', data) | |
# set interface last change | |
for oid in interface_changes: | |
data = interface_changes[oid] | |
interface_id = data['oid_end'] | |
if interface_id in interfaces: | |
interfaces[interface_id]['last_change'] = data['value'] | |
else: | |
print('Missing interface name for interface change', data) | |
# set interface admin state | |
for oid in admin_state: | |
data = admin_state[oid] | |
interface_id = data['oid_end'] | |
if interface_id in interfaces: | |
interfaces[interface_id]['admin_state'] = data['value'] | |
if data['value'] == '1': | |
interfaces[interface_id]['admin_state'] = 'up' | |
if data['value'] == '2': | |
interfaces[interface_id]['admin_state'] = 'down' | |
else: | |
print('Missing interface name for admin state', data) | |
# set interface pvid | |
for oid in pvid: | |
data = pvid[oid] | |
interface_id = data['oid_end'].split('.')[-1] | |
vlan_id = data['value'] | |
if vlan_id in vlan_names: | |
vlan_id = vlan_id + '/' + vlan_names[vlan_id] | |
else: | |
print('Missing vlan_id from vlan_names', vlan_id) | |
if interface_id in interfaces: | |
interfaces[interface_id]['pvid'] = vlan_id | |
else: | |
print('Missing interface name for pvid', data) | |
# set sub interfaces | |
for oid in ifstack: | |
data = ifstack[oid] | |
int1_id = data['oid_end'].split('.')[0] | |
int2_id = data['oid_end'].split('.')[1] | |
if int1_id not in interfaces: | |
continue | |
if int2_id not in interfaces: | |
continue | |
int1_name = interfaces[int1_id]['name'] | |
if int1_name == 'DEFAULT_VLAN': | |
int1_name = '1/' + int1_name | |
if re.search('^VLAN[0-9]+$', int1_name): | |
vlan_id = int1_name.replace('VLAN', '') | |
if vlan_id in vlan_names: | |
int1_name = vlan_id + '/' + vlan_names[vlan_id] | |
if interfaces[int2_id]['pvid'] != int1_name: | |
interfaces[int2_id]['ifstack'].append(int1_name) | |
# set list of mac addresses | |
mac_map = map_mac_to_interface(mac_addresses) | |
for interface_id in mac_map: | |
macs = mac_map[interface_id] | |
if interface_id in interfaces: | |
interfaces[interface_id]['macs'] = macs | |
interfaces[interface_id]['mac_count'] = len(macs) | |
# done | |
return interfaces | |
def get_cisco_info(host, community): | |
global retData | |
# get vlans | |
vlan_oid = '1.3.6.1.4.1.9.9.46.1.3.1.1.4' | |
get_bulk_oid(host, community, vlan_oid) # vlans | |
vlans = copy.deepcopy(retData) | |
vlan_names = {} | |
for oid in vlans: | |
data = vlans[oid] | |
vlan_names[data['oid_end'].split('.')[-1]] = data['value'] | |
# get interface names | |
get_bulk_oid(host, community, '1.3.6.1.2.1.31.1.1.1.1') # interface name | |
interface_names = copy.deepcopy(retData) | |
# get interface types | |
get_bulk_oid(host, community, '1.3.6.1.2.1.2.2.1.3') # interface type | |
interface_types = copy.deepcopy(retData) | |
# get last interface change | |
get_bulk_oid(host, community, '1.3.6.1.2.1.2.2.1.9') # last change | |
interface_changes = copy.deepcopy(retData) | |
# get interface admin state | |
get_bulk_oid(host, community, '1.3.6.1.2.1.2.2.1.7') # admin state | |
admin_state = copy.deepcopy(retData) | |
# get sub interfaces | |
get_bulk_oid(host, community, '1.3.6.1.2.1.31.1.2.1.3') # ifstack | |
ifstack = copy.deepcopy(retData) | |
# create interfaces object and default values | |
interfaces = OrderedDict() | |
for oid in interface_names: | |
data = interface_names[oid] | |
interfaces[data['oid_end']] = OrderedDict() | |
interfaces[data['oid_end']]['name'] = data['value'] | |
interfaces[data['oid_end']]['type'] = '' | |
interfaces[data['oid_end']]['admin_state'] = '' | |
interfaces[data['oid_end']]['last_change'] = '' | |
interfaces[data['oid_end']]['pvid'] = '' | |
interfaces[data['oid_end']]['ifstack'] = [] | |
interfaces[data['oid_end']]['vlans'] = [] | |
interfaces[data['oid_end']]['mac_count'] = 0 | |
interfaces[data['oid_end']]['macs'] = [] | |
# set interface type | |
for oid in interface_types: | |
data = interface_types[oid] | |
interface_id = data['oid_end'] | |
if interface_id in interfaces: | |
interfaces[interface_id]['type'] = data['value'] | |
else: | |
print('Missing interface name for interface type', data) | |
# set interface last change | |
for oid in interface_changes: | |
data = interface_changes[oid] | |
interface_id = data['oid_end'] | |
if interface_id in interfaces: | |
interfaces[interface_id]['last_change'] = data['value'] | |
else: | |
print('Missing interface name for interface change', data) | |
# set interface admin state | |
for oid in admin_state: | |
data = admin_state[oid] | |
interface_id = data['oid_end'] | |
if interface_id in interfaces: | |
interfaces[interface_id]['admin_state'] = data['value'] | |
if data['value'] == '1': | |
interfaces[interface_id]['admin_state'] = 'up' | |
if data['value'] == '2': | |
interfaces[interface_id]['admin_state'] = 'down' | |
else: | |
print('Missing interface name for admin state', data) | |
for oid in ifstack: | |
data = ifstack[oid] | |
int1_id = data['oid_end'].split('.')[0] | |
int2_id = data['oid_end'].split('.')[1] | |
if int1_id not in interfaces: | |
continue | |
if int2_id not in interfaces: | |
continue | |
int1_name = interfaces[int1_id]['name'] | |
if int1_name == 'DEFAULT_VLAN': | |
int1_name = '1/' + int1_name | |
if re.search('^VLAN[0-9]+$', int1_name): | |
vlan_id = int1_name.replace('VLAN', '') | |
if vlan_id in vlan_names: | |
int1_name = vlan_id + '/' + vlan_names[vlan_id] | |
if interfaces[int2_id]['pvid'] != int1_name: | |
interfaces[int2_id]['ifstack'].append(int1_name) | |
vlan_interfaces = get_cisco_vlan_macs(host, community, vlan_names) | |
for interface_id in vlan_interfaces: | |
vlans = [] | |
if interface_id not in interfaces: | |
continue | |
interface_vlans = vlan_interfaces[interface_id] | |
for vlan_id in interface_vlans: | |
if vlan_id in vlan_names: | |
vlans.append(vlan_id + '/' + vlan_names[vlan_id]) | |
else: | |
vlans.append(vlan_id) | |
macs = interface_vlans[vlan_id] | |
interfaces[interface_id]['macs'] = interfaces[interface_id]['macs'] + macs | |
interfaces[interface_id]['mac_count'] = len(interfaces[interface_id]['macs']) | |
interfaces[interface_id]['vlans'] = vlans | |
return interfaces | |
if len(sys.argv) != 3: | |
print('Need args') | |
sys.exit() | |
interfaces = get_switch_interface_info(sys.argv[1], sys.argv[2]) | |
fieldnames = [ | |
'name', | |
'type', | |
'admin_state', | |
'last_change', | |
'pvid', | |
'ifstack', | |
'vlans', | |
'mac_count', | |
'macs', | |
] | |
with open(sys.argv[1] + '.csv', 'w') as out_handle: | |
writer = csv.DictWriter(out_handle, fieldnames=fieldnames) | |
writer.writeheader() | |
for interface_id in interfaces: | |
interface = interfaces[interface_id] | |
for name in fieldnames: | |
if name not in interface: | |
interface[name] = '' | |
if type(interface[name]) != str: | |
interface[name] = json.dumps(interface[name]) | |
writer.writerow(interface) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uses SNMP to get basic interface config information and mac addresses associated with interfaces and vlans.
pip3 install pysnmp-lextudio
required