Skip to content

Instantly share code, notes, and snippets.

@nickadam
Last active August 26, 2023 17:34
Show Gist options
  • Save nickadam/7a89b80235d34074dc4f59afda15c070 to your computer and use it in GitHub Desktop.
Save nickadam/7a89b80235d34074dc4f59afda15c070 to your computer and use it in GitHub Desktop.
Get switch interface mac addresses via snmp (tested with aruba os, aruba cx, aruba wireless controllers, and cisco)
from pysnmp.carrier.asyncore.dispatch import AsyncoreDispatcher
from pysnmp.carrier.asyncore.dgram import udp
from pyasn1.codec.ber import encoder, decoder
from pysnmp.proto.api import v2c
from pysnmp.proto import api
from time import time
from collections import OrderedDict
import copy
import json
import re
import csv
import sys
startedAt = time()
retData = OrderedDict()
def map_mac_to_interface(mac_addresses, vlan_id=0):
mac_map = OrderedDict()
dd_map = OrderedDict()
for oid in mac_addresses:
data = mac_addresses[oid]
oid_next = data['oid_end'].split('.')[0]
mac_dd = '.'.join(data['oid_end'].split('.')[1:])
if oid_next == '1':
dd_map[mac_dd] = data['value'].replace('0x', '')
if oid_next == '2':
if mac_dd not in dd_map:
print('Mac dotted decimal interface not in mac table')
continue
if data['value'] not in mac_map:
mac_map[data['value']] = []
mac = dd_map[mac_dd]
if vlan_id:
mac = mac + '@' + vlan_id
mac_map[data['value']].append(mac)
return mac_map
def get_bulk_oid_f(host, community, oidstr, ignore_timeout=False):
global retData
global startedAt
startedAt = time()
retData = OrderedDict()
# SNMP table header
headVars = [v2c.ObjectIdentifier(oidstr)]
# Build PDU
reqPDU = v2c.GetBulkRequestPDU()
v2c.apiBulkPDU.setDefaults(reqPDU)
v2c.apiBulkPDU.setNonRepeaters(reqPDU, 0)
v2c.apiBulkPDU.setMaxRepetitions(reqPDU, 25)
v2c.apiBulkPDU.setVarBinds(reqPDU, [(x, v2c.null) for x in headVars])
# Build message
reqMsg = v2c.Message()
v2c.apiMessage.setDefaults(reqMsg)
v2c.apiMessage.setCommunity(reqMsg, community)
v2c.apiMessage.setPDU(reqMsg, reqPDU)
def cbTimerFun(timeNow):
if timeNow - startedAt > 5:
if not ignore_timeout:
raise ValueError("Request timed out")
transportDispatcher.jobFinished(1)
# noinspection PyUnusedLocal
def cbRecvFun(
transportDispatcher,
transportDomain,
transportAddress,
wholeMsg,
reqPDU=reqPDU,
headVars=headVars,
):
while wholeMsg:
rspMsg, wholeMsg = decoder.decode(wholeMsg, asn1Spec=v2c.Message())
rspPDU = v2c.apiMessage.getPDU(rspMsg)
# Match response to request
if v2c.apiBulkPDU.getRequestID(reqPDU) == v2c.apiBulkPDU.getRequestID(rspPDU):
# Format var-binds table
varBindTable = v2c.apiBulkPDU.getVarBindTable(reqPDU, rspPDU)
# Check for SNMP errors reported
errorStatus = v2c.apiBulkPDU.getErrorStatus(rspPDU)
if errorStatus and errorStatus != 2:
errorIndex = v2c.apiBulkPDU.getErrorIndex(rspPDU)
print(
"{} at {}".format(
errorStatus.prettyPrint(),
errorIndex and varBindTable[int(errorIndex) - 1] or "?",
)
)
transportDispatcher.jobFinished(1)
break
# Report SNMP table
stop = False
for tableRow in varBindTable:
for name, val in tableRow:
if oidstr + '.' not in name.prettyPrint():
stop = True
break
oid_end = name.prettyPrint().replace(oidstr + '.', '')
retData[name.prettyPrint()] = {'oid': oidstr, 'oid_end': oid_end, 'value': val.prettyPrint()}
# print(
# "from: {}, {} = {}".format(
# transportAddress, name.prettyPrint(), val.prettyPrint()
# )
# )
# Stop on end of missing headvar
if stop:
transportDispatcher.jobFinished(1)
# Stop on EOM
for oid, val in varBindTable[-1]:
if not isinstance(val, v2c.Null):
break
else:
transportDispatcher.jobFinished(1)
# Generate request for next row
v2c.apiBulkPDU.setVarBinds(
reqPDU, [(x, v2c.null) for x, y in varBindTable[-1]]
)
v2c.apiBulkPDU.setRequestID(reqPDU, v2c.getNextRequestID())
transportDispatcher.sendMessage(
encoder.encode(reqMsg), transportDomain, transportAddress
)
global startedAt
if time() - startedAt > 5:
if not ignore_timeout:
raise ValueError("Request timed out")
transportDispatcher.jobFinished(1)
startedAt = time()
return retData
transportDispatcher = AsyncoreDispatcher()
transportDispatcher.registerRecvCbFun(cbRecvFun)
transportDispatcher.registerTimerCbFun(cbTimerFun)
transportDispatcher.registerTransport(
udp.domainName, udp.UdpSocketTransport().openClientMode()
)
transportDispatcher.sendMessage(
encoder.encode(reqMsg), udp.domainName, (host, 161)
)
transportDispatcher.jobStarted(1)
# Dispatcher will finish as job#1 counter reaches zero
transportDispatcher.runDispatcher()
transportDispatcher.closeDispatcher()
def get_bulk_oid(host, community, oidstr, ignore_timeout=False, retry=0):
if retry == 3:
if oidstr == '1.3.6.1.2.1.1.1': # first system call
print(host, 'timeout', file=sys.stderr)
sys.exit(1)
raise ValueError("Too many retries from get_bulk_oid", host, oidstr)
try:
return get_bulk_oid_f(host, community, oidstr, ignore_timeout)
except ValueError as e:
retry = retry + 1
print(host, 'retrying', oidstr, file=sys.stderr)
return get_bulk_oid(host, community, oidstr, ignore_timeout, retry)
def get_oid_f(host, community, oidstr):
global startedAt
global retData
retData = OrderedDict()
startedAt = time()
# Protocol version to use
pMod = api.protoModules[api.protoVersion1]
# pMod = api.protoModules[api.protoVersion2c]
# SNMP table header
headVars = [pMod.ObjectIdentifier(oidstr)]
# Build PDU
reqPDU = pMod.GetNextRequestPDU()
pMod.apiPDU.setDefaults(reqPDU)
pMod.apiPDU.setVarBinds(reqPDU, [(x, pMod.null) for x in headVars])
# Build message
reqMsg = pMod.Message()
pMod.apiMessage.setDefaults(reqMsg)
pMod.apiMessage.setCommunity(reqMsg, community)
pMod.apiMessage.setPDU(reqMsg, reqPDU)
def cbTimerFun(timeNow):
if timeNow - startedAt > 5:
raise ValueError("Request timed out")
# noinspection PyUnusedLocal
def cbRecvFun(
transportDispatcher,
transportDomain,
transportAddress,
wholeMsg,
reqPDU=reqPDU,
headVars=headVars,
):
while wholeMsg:
rspMsg, wholeMsg = decoder.decode(wholeMsg, asn1Spec=pMod.Message())
rspPDU = pMod.apiMessage.getPDU(rspMsg)
# Match response to request
if pMod.apiPDU.getRequestID(reqPDU) == pMod.apiPDU.getRequestID(rspPDU):
# Check for SNMP errors reported
errorStatus = pMod.apiPDU.getErrorStatus(rspPDU)
if errorStatus and errorStatus != 2:
raise ValueError(errorStatus)
# Format var-binds table
varBindTable = pMod.apiPDU.getVarBindTable(reqPDU, rspPDU)
stop = False
for tableRow in varBindTable:
for name, val in tableRow:
if oidstr + '.' not in name.prettyPrint():
stop = True
break
oid_end = name.prettyPrint().replace(oidstr + '.', '')
retData[name.prettyPrint()] = {'oid': oidstr, 'oid_end': oid_end, 'value': val.prettyPrint()}
# Stop on end of missing headvar
if stop:
transportDispatcher.jobFinished(1)
# Stop on EOM
for oid, val in varBindTable[-1]:
if not isinstance(val, pMod.Null):
break
else:
transportDispatcher.jobFinished(1)
# Generate request for next row
pMod.apiPDU.setVarBinds(
reqPDU, [(x, pMod.null) for x, y in varBindTable[-1]]
)
pMod.apiPDU.setRequestID(reqPDU, pMod.getNextRequestID())
transportDispatcher.sendMessage(
encoder.encode(reqMsg), transportDomain, transportAddress
)
global startedAt
if time() - startedAt > 5:
raise ValueError("Request timed out")
startedAt = time()
return retData
transportDispatcher = AsyncoreDispatcher()
transportDispatcher.registerRecvCbFun(cbRecvFun)
transportDispatcher.registerTimerCbFun(cbTimerFun)
transportDispatcher.registerTransport(
udp.domainName, udp.UdpSocketTransport().openClientMode()
)
transportDispatcher.sendMessage(
encoder.encode(reqMsg), udp.domainName, (host, 161)
)
transportDispatcher.jobStarted(1)
transportDispatcher.runDispatcher()
transportDispatcher.closeDispatcher()
def get_oid(host, community, oidstr, retry=0):
if retry == 3:
raise ValueError("Too many retries from get_bulk_oid", host, oidstr)
try:
return get_oid_f(host, community, oidstr)
except ValueError as e:
retry = retry + 1
print(host, 'retrying', oidstr, file=sys.stderr)
return get_oid(host, community, oidstr, retry)
def get_tag_interfaces(hexcode):
hexcode = hexcode.replace('0x', '')
int_list = []
scale = 16
ss = bin(int(hexcode, scale))[2:]
p=1
for s in ss:
if int(s) == 1:
int_list.append(str(p))
p+=1
return int_list
def get_cisco_vlan_macs(host, community, vlan_names):
# https://www.cisco.com/c/en/us/support/docs/ip/simple-network-management-protocol-snmp/44800-mactoport44800.html
# https://www.cisco.com/c/en/us/support/docs/ip/simple-network-management-protocol-snmp/40700-snmp-ifIndex40700.html
global retData
vlans = []
for vlan_id in vlan_names:
vlan_name = vlan_names[vlan_id]
if re.search('-default', vlan_name, re.I):
continue
vlans.append(vlan_id)
vlan_interfaces = OrderedDict()
# {'interface_id': {'vlan_id': ['mac']}}
for vlan_id in vlans:
# get vlans
get_bulk_oid(host, community + '@' + vlan_id, '1.3.6.1.2.1.17.4.3.1') # mac address
mac_addresses = copy.deepcopy(retData)
if(not len(mac_addresses)):
continue
# get base ports
get_bulk_oid(host, community + '@' + vlan_id, '1.3.6.1.2.1.17.1.4.1.2') # base ports
base_port_map = copy.deepcopy(retData)
base_ports = {}
for oid in base_port_map:
data = base_port_map[oid]
base_ports[data['oid_end']] = data['value']
# map it all together
mac_map = map_mac_to_interface(mac_addresses, vlan_id)
for port_id in mac_map:
macs = mac_map[port_id]
interface_id = port_id
if port_id in base_ports:
interface_id = base_ports[port_id]
if interface_id not in vlan_interfaces:
vlan_interfaces[interface_id] = OrderedDict()
vlan_interfaces[interface_id][vlan_id] = macs
return vlan_interfaces
def get_switch_interface_info(host, community):
global retData
# get_bulk_oid(host, community, '1.3.6.1.2.1.47.1.1.1.1') # components
# get system info
get_bulk_oid(host, community, '1.3.6.1.2.1.1.1') # system
check_sys_info = copy.deepcopy(retData)
sys_info = {'value': ''}
sys_info_value = ''
for oid in check_sys_info:
sys_info = check_sys_info[oid]
sys_info_value = sys_info['value']
if re.search('^0x', sys_info_value):
sys_info_value = bytes.fromhex(sys_info_value.replace('0x', '')).decode('utf-8')
if re.search('cisco', sys_info_value, re.I):
return get_cisco_info(host, community)
else:
return get_aruba_info(host, community)
def get_aruba_controller_info(host, community, vlans):
# get mac addresses
get_bulk_oid(host, community, '1.3.6.1.4.1.14823.2.2.1.1.2.1.1.2') # mac address
mac_addresses = copy.deepcopy(retData)
# get usernames
get_bulk_oid(host, community, '1.3.6.1.4.1.14823.2.2.1.1.2.1.1.3') # username
usernames = copy.deepcopy(retData)
# get roles
get_bulk_oid(host, community, '1.3.6.1.4.1.14823.2.2.1.1.2.1.1.4') # role
roles = copy.deepcopy(retData)
# get uptime
get_bulk_oid(host, community, '1.3.6.1.4.1.14823.2.2.1.1.2.1.1.5') # uptime
uptime = copy.deepcopy(retData)
# get auth method
get_bulk_oid(host, community, '1.3.6.1.4.1.14823.2.2.1.1.2.1.1.6') # auth method
auth_methods = copy.deepcopy(retData)
# get access point
get_bulk_oid(host, community, '1.3.6.1.4.1.14823.2.2.1.1.2.1.1.7') # access point
access_points = copy.deepcopy(retData)
# get bandwith contract
get_bulk_oid(host, community, '1.3.6.1.4.1.14823.2.2.1.1.2.1.1.12') # bandwith
bandwith_contracts = copy.deepcopy(retData)
clients = OrderedDict()
for oid in mac_addresses:
data = mac_addresses[oid]
client_id = data['oid_end']
clients[client_id] = OrderedDict()
clients[client_id]['mac'] = data['value'].replace('0x', '')
clients[client_id]['ip'] = client_id
clients[client_id]['auth'] = ''
clients[client_id]['username'] = ''
clients[client_id]['vlan'] = ''
clients[client_id]['ap'] = ''
clients[client_id]['role'] = ''
clients[client_id]['bandwith'] = ''
clients[client_id]['uptime'] = ''
# set auth
for oid in auth_methods:
data = auth_methods[oid]
client_id = data['oid_end']
value = data['value']
if value == '1':
value = 'none'
if value == '2':
value = 'other'
if value == '3':
value = 'web'
if value == '4':
value = 'dot1x'
if value == '5':
value = 'vpn'
if value == '6':
value = 'mac'
if client_id in clients:
clients[client_id]['auth'] = value
else:
print('Missing client_id for auth method', data)
# set username
for oid in usernames:
data = usernames[oid]
client_id = data['oid_end']
value = data['value']
if client_id in clients:
clients[client_id]['username'] = value
else:
print('Missing client_id for username', data)
# set vlan
for oid in vlans:
data = vlans[oid]
client_id = data['oid_end']
value = data['value']
if client_id in clients:
clients[client_id]['vlan'] = value
else:
print('Missing client_id for vlan', data)
# set ap
for oid in access_points:
data = access_points[oid]
client_id = data['oid_end']
value = data['value']
if client_id in clients:
clients[client_id]['ap'] = value
else:
print('Missing client_id for ap', data)
# set role
for oid in roles:
data = roles[oid]
client_id = data['oid_end']
value = data['value']
if client_id in clients:
clients[client_id]['role'] = value
else:
print('Missing client_id for role', data)
# set bandwith
for oid in bandwith_contracts:
data = bandwith_contracts[oid]
client_id = data['oid_end']
value = data['value']
if client_id in clients:
clients[client_id]['bandwith'] = value
else:
print('Missing client_id for bandwith', data)
# set uptime
for oid in uptime:
data = uptime[oid]
client_id = data['oid_end']
value = data['value']
if client_id in clients:
clients[client_id]['uptime'] = value
else:
print('Missing client_id for uptime', data)
fieldnames = [
'mac',
'ip',
'auth',
'username',
'vlan',
'ap',
'role',
'bandwith',
'uptime',
]
with open(sys.argv[1] + '_wifi.csv', 'w') as out_handle:
writer = csv.DictWriter(out_handle, fieldnames=fieldnames)
writer.writeheader()
for client_id in clients:
interface = clients[client_id]
for name in fieldnames:
if name not in interface:
interface[name] = ''
if type(interface[name]) != str:
interface[name] = json.dumps(interface[name])
writer.writerow(interface)
def get_aruba_info(host, community):
global retData
# get vlans
vlan_oid = '1.3.6.1.2.1.17.7.1.4.3.1.1'
get_bulk_oid(host, community, vlan_oid) # vlans
vlans = copy.deepcopy(retData)
arubacx = False
if len(vlans) == 0: # try aruba controller
get_bulk_oid(host, community, '1.3.6.1.4.1.14823.2.2.1.1.2.1.1.9', True)
vlans = copy.deepcopy(retData)
if(len(vlans) != 0):
get_aruba_controller_info(host, community, vlans)
sys.exit()
if len(vlans) == 0: # try aruba cx
arubacx = True
get_bulk_oid(host, community, '1.3.111.2.802.1.1.4.1.4.3', True)
vlans = copy.deepcopy(retData)
vlan_names = {}
for oid in vlans:
data = vlans[oid]
vlan_names[data['oid_end'].split('.')[-1]] = data['value']
# get interface name
get_bulk_oid(host, community, '1.3.6.1.2.1.2.2.1.2') # interface name
interface_names = copy.deepcopy(retData)
# get interface types
get_bulk_oid(host, community, '1.3.6.1.2.1.2.2.1.3') # interface type
interface_types = copy.deepcopy(retData)
# get mac addresses
get_bulk_oid(host, community, '1.3.6.1.2.1.17.4.3.1') # mac address
mac_addresses = copy.deepcopy(retData)
# get last interface change
get_bulk_oid(host, community, '1.3.6.1.2.1.2.2.1.9') # last change
interface_changes = copy.deepcopy(retData)
# get interface admin state
get_bulk_oid(host, community, '1.3.6.1.2.1.2.2.1.7') # admin state
admin_state = copy.deepcopy(retData)
# get interface pvid
pvid_oid = '1.3.6.1.2.1.17.7.1.4.5.1.1'
if arubacx:
pvid_oid = '1.3.111.2.802.1.1.4.1.4.5.1.1'
get_bulk_oid(host, community, pvid_oid) # pvid
pvid = copy.deepcopy(retData)
# get sub interfaces
ifstack = OrderedDict()
if not arubacx:
get_bulk_oid(host, community, '1.3.6.1.2.1.31.1.2.1.3') # ifstack
ifstack = copy.deepcopy(retData)
# create interfaces object and default values
interfaces = OrderedDict()
for oid in interface_names:
data = interface_names[oid]
interfaces[data['oid_end']] = OrderedDict()
interfaces[data['oid_end']]['name'] = data['value']
interfaces[data['oid_end']]['type'] = ''
interfaces[data['oid_end']]['admin_state'] = ''
interfaces[data['oid_end']]['last_change'] = ''
interfaces[data['oid_end']]['pvid'] = ''
interfaces[data['oid_end']]['ifstack'] = []
interfaces[data['oid_end']]['vlans'] = []
interfaces[data['oid_end']]['mac_count'] = 0
interfaces[data['oid_end']]['macs'] = []
# set interface type
for oid in interface_types:
data = interface_types[oid]
interface_id = data['oid_end']
if interface_id in interfaces:
interfaces[interface_id]['type'] = data['value']
else:
print('Missing interface name for interface type', data)
# set interface last change
for oid in interface_changes:
data = interface_changes[oid]
interface_id = data['oid_end']
if interface_id in interfaces:
interfaces[interface_id]['last_change'] = data['value']
else:
print('Missing interface name for interface change', data)
# set interface admin state
for oid in admin_state:
data = admin_state[oid]
interface_id = data['oid_end']
if interface_id in interfaces:
interfaces[interface_id]['admin_state'] = data['value']
if data['value'] == '1':
interfaces[interface_id]['admin_state'] = 'up'
if data['value'] == '2':
interfaces[interface_id]['admin_state'] = 'down'
else:
print('Missing interface name for admin state', data)
# set interface pvid
for oid in pvid:
data = pvid[oid]
interface_id = data['oid_end'].split('.')[-1]
vlan_id = data['value']
if vlan_id in vlan_names:
vlan_id = vlan_id + '/' + vlan_names[vlan_id]
else:
print('Missing vlan_id from vlan_names', vlan_id)
if interface_id in interfaces:
interfaces[interface_id]['pvid'] = vlan_id
else:
print('Missing interface name for pvid', data)
# set sub interfaces
for oid in ifstack:
data = ifstack[oid]
int1_id = data['oid_end'].split('.')[0]
int2_id = data['oid_end'].split('.')[1]
if int1_id not in interfaces:
continue
if int2_id not in interfaces:
continue
int1_name = interfaces[int1_id]['name']
if int1_name == 'DEFAULT_VLAN':
int1_name = '1/' + int1_name
if re.search('^VLAN[0-9]+$', int1_name):
vlan_id = int1_name.replace('VLAN', '')
if vlan_id in vlan_names:
int1_name = vlan_id + '/' + vlan_names[vlan_id]
if interfaces[int2_id]['pvid'] != int1_name:
interfaces[int2_id]['ifstack'].append(int1_name)
# set list of mac addresses
mac_map = map_mac_to_interface(mac_addresses)
for interface_id in mac_map:
macs = mac_map[interface_id]
if interface_id in interfaces:
interfaces[interface_id]['macs'] = macs
interfaces[interface_id]['mac_count'] = len(macs)
# done
return interfaces
def get_cisco_info(host, community):
global retData
# get vlans
vlan_oid = '1.3.6.1.4.1.9.9.46.1.3.1.1.4'
get_bulk_oid(host, community, vlan_oid) # vlans
vlans = copy.deepcopy(retData)
vlan_names = {}
for oid in vlans:
data = vlans[oid]
vlan_names[data['oid_end'].split('.')[-1]] = data['value']
# get interface names
get_bulk_oid(host, community, '1.3.6.1.2.1.31.1.1.1.1') # interface name
interface_names = copy.deepcopy(retData)
# get interface types
get_bulk_oid(host, community, '1.3.6.1.2.1.2.2.1.3') # interface type
interface_types = copy.deepcopy(retData)
# get last interface change
get_bulk_oid(host, community, '1.3.6.1.2.1.2.2.1.9') # last change
interface_changes = copy.deepcopy(retData)
# get interface admin state
get_bulk_oid(host, community, '1.3.6.1.2.1.2.2.1.7') # admin state
admin_state = copy.deepcopy(retData)
# get sub interfaces
get_bulk_oid(host, community, '1.3.6.1.2.1.31.1.2.1.3') # ifstack
ifstack = copy.deepcopy(retData)
# create interfaces object and default values
interfaces = OrderedDict()
for oid in interface_names:
data = interface_names[oid]
interfaces[data['oid_end']] = OrderedDict()
interfaces[data['oid_end']]['name'] = data['value']
interfaces[data['oid_end']]['type'] = ''
interfaces[data['oid_end']]['admin_state'] = ''
interfaces[data['oid_end']]['last_change'] = ''
interfaces[data['oid_end']]['pvid'] = ''
interfaces[data['oid_end']]['ifstack'] = []
interfaces[data['oid_end']]['vlans'] = []
interfaces[data['oid_end']]['mac_count'] = 0
interfaces[data['oid_end']]['macs'] = []
# set interface type
for oid in interface_types:
data = interface_types[oid]
interface_id = data['oid_end']
if interface_id in interfaces:
interfaces[interface_id]['type'] = data['value']
else:
print('Missing interface name for interface type', data)
# set interface last change
for oid in interface_changes:
data = interface_changes[oid]
interface_id = data['oid_end']
if interface_id in interfaces:
interfaces[interface_id]['last_change'] = data['value']
else:
print('Missing interface name for interface change', data)
# set interface admin state
for oid in admin_state:
data = admin_state[oid]
interface_id = data['oid_end']
if interface_id in interfaces:
interfaces[interface_id]['admin_state'] = data['value']
if data['value'] == '1':
interfaces[interface_id]['admin_state'] = 'up'
if data['value'] == '2':
interfaces[interface_id]['admin_state'] = 'down'
else:
print('Missing interface name for admin state', data)
for oid in ifstack:
data = ifstack[oid]
int1_id = data['oid_end'].split('.')[0]
int2_id = data['oid_end'].split('.')[1]
if int1_id not in interfaces:
continue
if int2_id not in interfaces:
continue
int1_name = interfaces[int1_id]['name']
if int1_name == 'DEFAULT_VLAN':
int1_name = '1/' + int1_name
if re.search('^VLAN[0-9]+$', int1_name):
vlan_id = int1_name.replace('VLAN', '')
if vlan_id in vlan_names:
int1_name = vlan_id + '/' + vlan_names[vlan_id]
if interfaces[int2_id]['pvid'] != int1_name:
interfaces[int2_id]['ifstack'].append(int1_name)
vlan_interfaces = get_cisco_vlan_macs(host, community, vlan_names)
for interface_id in vlan_interfaces:
vlans = []
if interface_id not in interfaces:
continue
interface_vlans = vlan_interfaces[interface_id]
for vlan_id in interface_vlans:
if vlan_id in vlan_names:
vlans.append(vlan_id + '/' + vlan_names[vlan_id])
else:
vlans.append(vlan_id)
macs = interface_vlans[vlan_id]
interfaces[interface_id]['macs'] = interfaces[interface_id]['macs'] + macs
interfaces[interface_id]['mac_count'] = len(interfaces[interface_id]['macs'])
interfaces[interface_id]['vlans'] = vlans
return interfaces
if len(sys.argv) != 3:
print('Need args')
sys.exit()
interfaces = get_switch_interface_info(sys.argv[1], sys.argv[2])
fieldnames = [
'name',
'type',
'admin_state',
'last_change',
'pvid',
'ifstack',
'vlans',
'mac_count',
'macs',
]
with open(sys.argv[1] + '.csv', 'w') as out_handle:
writer = csv.DictWriter(out_handle, fieldnames=fieldnames)
writer.writeheader()
for interface_id in interfaces:
interface = interfaces[interface_id]
for name in fieldnames:
if name not in interface:
interface[name] = ''
if type(interface[name]) != str:
interface[name] = json.dumps(interface[name])
writer.writerow(interface)
@nickadam
Copy link
Author

nickadam commented Aug 25, 2023

Uses SNMP to get basic interface config information and mac addresses associated with interfaces and vlans.
pip3 install pysnmp-lextudio required

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment