Skip to content

Instantly share code, notes, and snippets.

@dorkmatt
Last active June 28, 2022 11:32
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save dorkmatt/e04a10e3a8c8b60ec4b17babd376f599 to your computer and use it in GitHub Desktop.
Save dorkmatt/e04a10e3a8c8b60ec4b17babd376f599 to your computer and use it in GitHub Desktop.
Netbox Juniper/Arista optics audit
#!/usr/bin/env python
#
# Fetch list of Juniper/Arista devices via Netbox, query optics info via Netconf, write to CSV file
#
# Written at 34c4 in Leipzig, Germany
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Author: Matt Peterson <matt@peterson.org>
import sys
import os
import pwd
import json
import pprint
import csv
import re
try:
from jnpr.junos import Device
from lxml import etree
from netaddr import *
import pynetbox
from netmiko import ConnectHandler
except ImportError, e:
sys.exit("Please install required python modules: junos-eznc lxml netaddr pynetbox netmiko")
pp = pprint.PrettyPrinter(indent=4)
username = pwd.getpwuid(os.getuid())[0] # assume current user login name
def fetch_netbox(brand):
try:
netbox = pynetbox.api('https://netbox/',token='xyz')
except AttributeError:
sys.exit("Failed to connect to netbox instance")
for host in netbox.dcim.devices.filter(manufacturer=brand,status=1):
host = str(host)
print "### Netbox: " + host
host_data = netbox.dcim.devices.get(name=host)
v4_address = host.primary_ip4
if v4_address is not None and brand == "juniper":
v4_address = IPNetwork(str(host_data.primary_ip4)).ip
fetch_juniper_inventory(host,v4_address,username)
elif v4_address is not None and brand == "arista":
v4_address = IPNetwork(str(host_data.primary_ip4)).ip
fetch_arista_inventory(host,v4_address)
else:
print "## No IPv4 address found for " + host
def fetch_arista_inventory(host,v4_address):
arista_options = {
'device_type': 'arista_eos',
'ip': str(v4_address),
'username': 'admin',
'key_file': "../access/creds/id_rsa"
}
print "### Connecting " + username + "@" + host
try:
arista_device = ConnectHandler(**arista_options)
except Exception as e:
print e.__doc__
print e.message
return False
prompt = arista_device.find_prompt().replace("#","")
show_ver_json_output = json.loads(arista_device.send_command("show version | json"))
print "## Connected to " + prompt + " S/N (" + show_ver_json_output['serialNumber'] + ")"
show_interfaces_transceiver = json.loads(arista_device.send_command("show interfaces transceiver | json"))
show_inventory = json.loads(arista_device.send_command("show inventory | json"))
for key,value in show_interfaces_transceiver["interfaces"].items():
if 'mediaType' in value:
wavelength = arista_device.send_command('show interfaces ' + key + ' transceiver hardware | include ^Wavelength')
wavelength = wavelength.split(": ")[1] + " nm"
interface_numeric_name = re.findall('\d+', key)
interface_numeric_name = ''.join(interface_numeric_name)
optics_list.append({
"host": host,
"port": key,
"description": value['mediaType'],
"cable_type": value['mediaType'],
"sfp-vendor-name": show_inventory['xcvrSlots'][interface_numeric_name]['mfgName'],
"sfp-vendor-pno": show_inventory['xcvrSlots'][interface_numeric_name]['modelName'],
"part-number": show_inventory['xcvrSlots'][interface_numeric_name]['modelName'],
"serial-number": value['vendorSn'],
"wavelength": wavelength,
})
write_csv_file(fields=[
host,
show_inventory['xcvrSlots'][interface_numeric_name]['mfgName'],
show_inventory['xcvrSlots'][interface_numeric_name]['modelName'],
value['mediaType'],
value['vendorSn'],
])
arista_device.disconnect()
def fetch_juniper_inventory(host,v4_address,username):
print "### Connecting " + username + "@" + host
junos_device = Device(host=str(v4_address), user=username, timeout=15) # assume ~/.ssh/config with correct key
try:
junos_device.open()
except Exception as e:
print e.__doc__
return False
print "## Connected to " + junos_device.facts['hostname'] + " S/N (" + junos_device.facts['serialnumber'] + ")"
chassis_inventory = junos_device.rpc.get_chassis_inventory()
#print chassis_inventory
for item in chassis_inventory.findall('chassis/chassis-module/name'):
if str(item.text).startswith("FPC"):
fpc = item.text.lower()
for pic in chassis_inventory.xpath("chassis/chassis-module/chassis-sub-module/name[contains(text(),'PIC')]"):
pic_slot = pic.text.replace("PIC ","")
fpc_slot = fpc.replace("fpc","")
#import pdb;pdb.set_trace()
#print "result_pic_detail is " + str(result_pic_detail)
#command = "show chassis pic fpc-slot" + fpc_slot + " pic-slot " + pic_slot
#print command
# { 'description': 'QSFP+-40G-SR4',
# 'name': 'Xcvr 0',
# 'part-number': 'NON-JNPR',
# 'serial-number': '482943AK004W'}
result_pic_detail = junos_device.rpc.get_pic_detail(fpc_slot=fpc_slot, pic_slot=pic_slot, normalize=True).findall("fpc/pic-detail/port-information/port//")
pic_values = dict([(node.tag, node.text) for node in result_pic_detail])
# { 'cable-type': '100GBASE LR4',
# 'fiber-mode': 'SM',
# 'port-number': '0',
# 'sfp-vendor-fw-ver': '1.0',
# 'sfp-vendor-name': 'SumitomoElectric',
# 'sfp-vendor-pno': 'SCF1001L4LNJ101',
# 'wavelength': '1310 nm'}
if pic_values.get('port-number'):
xpath_query = 'chassis/chassis-module/name[text()="FPC' + fpc_slot + '"]/parent::*//chassis-sub-module/name[text()="PIC ' + pic_slot + '"]/parent::*/chassis-sub-sub-module/name[text()="Xcvr ' + pic_values.get('port-number') + '"]/parent::chassis-sub-sub-module'
result_chassis_inventory = chassis_inventory.xpath(xpath_query) #'chassis/chassis-module/name[text()="FPC 0"]/parent::*//chassis-sub-module/name[text()="PIC 2"]/parent::*/chassis-sub-sub-module/name[text()="Xcvr 0"]/parent::chassis-sub-sub-module')
port_values = dict([(node.tag, node.text) for node in result_chassis_inventory[0]])
optics_list.append({
"host": host,
"fpc": fpc_slot,
"pic": pic_slot,
"port": pic_values.get('port-number'),
"description": port_values.get('description'),
"cable_type": pic_values.get('cable-type').replace(" ","-"),
"sfp-vendor-name": pic_values.get('sfp-vendor-name'),
"sfp-vendor-pno": pic_values.get('sfp-vendor-pno'),
"part-number": port_values.get('part-number'),
"serial-number": port_values.get('serial-number'),
"wavelength": pic_values.get('wavelength'),
})
write_csv_file(fields=[
host,
pic_values.get('sfp-vendor-name'),
pic_values.get('sfp-vendor-pno'),
port_values.get('description'),
port_values.get('serial-number'),
])
junos_device.close()
def write_csv_file(fields):
with open(r'./optics_audit.csv', 'a') as file:
writer = csv.writer(file, quoting=csv.QUOTE_ALL)
writer.writerow(fields)
if __name__ == "__main__":
optics_list = []
fetch_netbox('arista')
fetch_netbox('juniper')
pp.pprint(optics_list)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment