Last active
November 24, 2024 10:49
-
-
Save tchevalleraud/dab0244165c3a42702fe07377f5035f9 to your computer and use it in GitHub Desktop.
XIQSE Scripts - Update Password for EXOS/VOSS
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
This script is provided free of charge by Extreme. We hope such scripts are | |
helpful when used in conjunction with Extreme products and technology and can | |
be used as examples to modify and adapt for your ultimate requirements. | |
Extreme will not provide any official support for these scripts. If you do | |
have any questions or queries about any of these scripts you may post on | |
Extreme's community website "The Hub" (https://community.extremenetworks.com/) | |
under the scripting category. | |
ANY SCRIPTS PROVIDED BY EXTREME ARE HEREBY PROVIDED "AS IS", WITHOUT WARRANTY | |
OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES | |
OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. | |
IN NO EVENT SHALL EXTREME OR ITS THIRD PARTY LICENSORS BE LIABLE FOR ANY CLAIM, | |
DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, | |
ARISING FROM, OUT OF OR IN CONNECTION WITH THE USE OR DISTRIBUTION OF SUCH | |
SCRIPTS. | |
''' | |
########################################################## | |
# XIQ-SE Script: Change password for EXOS / VOSS / ERS # | |
# Written by Thibault Chevalleraud, Sr Extreme Networks # | |
########################################################## | |
__version__ = '1.0' | |
''' | |
#@MetaDataStart | |
#@DetailDescriptionStart | |
###################################################################################### | |
# | |
# This script ... | |
# | |
####################################################################################### | |
#@DetailDescriptionEnd | |
#@SectionStart (description = "Security account - EXOS") | |
# @VariableFieldLabel ( | |
# description = "Username", | |
# type = string, | |
# required = no, | |
# name = "userInput_exos_username", | |
# ) | |
# @VariableFieldLabel ( | |
# description = "Old Password", | |
# type = string, | |
# required = no, | |
# name = "userInput_exos_password_old", | |
# ) | |
# @VariableFieldLabel ( | |
# description = "New Password", | |
# type = string, | |
# required = no, | |
# name = "userInput_exos_password_new", | |
# ) | |
#@SectionEnd | |
#@SectionStart (description = "Security account - VOSS") | |
# @VariableFieldLabel ( | |
# description = "Account level", | |
# type = string, | |
# required = no, | |
# validValues = [read-write-all, read-write, read-only, layer1, layer2, layer3], | |
# name = "userInput_voss_level", | |
# ) | |
# @VariableFieldLabel ( | |
# description = "Username", | |
# type = string, | |
# required = no, | |
# name = "userInput_voss_username", | |
# ) | |
# @VariableFieldLabel ( | |
# description = "Old Password", | |
# type = string, | |
# required = no, | |
# name = "userInput_voss_password_old", | |
# ) | |
# @VariableFieldLabel ( | |
# description = "New Password", | |
# type = string, | |
# required = no, | |
# name = "userInput_voss_password_new", | |
# ) | |
#@SectionEnd | |
#@MetaDataEnd | |
''' | |
########################################################## | |
# Ludo Standard library; Version 4.02 # | |
# Written by Ludovico Stevens, TME Extreme Networks # | |
########################################################## | |
Debug = False # Enables debug messages | |
Sanity = False # If enabled, config commands are not sent to host (show commands are operational) | |
########################################################## | |
try: | |
emc_vars | |
except: # If not running on XMC Jython... | |
# These lines only needed to run XMC Python script locally (on my laptop) | |
# They can also be pasted to XMC, but will not execute | |
import sys | |
import json | |
import java.util | |
import emc_cli # Own local replica | |
import emc_nbi # Own local replica | |
import emc_results # Own local replica | |
if len(sys.argv) > 1: # Json file as 1st argv | |
emc_vars = json.load(open(sys.argv[1])) | |
else: | |
emc_vars = json.load(open('emc_vars.json')) | |
########################################################## | |
# | |
# IMPORTS: distributed amongst sections below | |
# | |
from distutils.version import LooseVersion | |
# | |
# Base functions | |
# v7 | |
import re # Used by scriptName | |
import time # Used by exitError | |
ExitErrorSleep = 10 | |
def debug(debugOutput): # v1 - Use function to include debugging in script; set above Debug variable to True or False to turn on or off debugging | |
if Debug: | |
print debugOutput | |
def exitError(errorOutput, sleep=ExitErrorSleep): # v3 - Exit script with error message and setting status appropriately | |
if 'workflowMessage' in emc_vars: # Workflow | |
time.sleep(sleep) # When workflow run on multiple devices, want ones that error to be last to complete, so THEY set the workflow message | |
emc_results.put("deviceMessage", errorOutput) | |
emc_results.put("activityMessage", errorOutput) | |
emc_results.put("workflowMessage", errorOutput) | |
emc_results.setStatus(emc_results.Status.ERROR) | |
raise RuntimeError(errorOutput) | |
def abortError(cmd, errorOutput): # v1 - A CLI command failed, before bombing out send any rollback commands which may have been set | |
print "Aborting script due to error on previous command" | |
try: | |
rollbackStack() | |
finally: | |
print "Aborting because this command failed: {}".format(cmd) | |
exitError(errorOutput) | |
def scriptName(): # v1 - Returns the assigned name of the Script or Workflow | |
name = None | |
if 'workflowName' in emc_vars: # Workflow | |
name = emc_vars['workflowName'] | |
elif 'javax.script.filename' in emc_vars: # Script | |
nameMatch = re.search(r'\/([^\/\.]+)\.py$', emc_vars['javax.script.filename']) | |
name = nameMatch.group(1) if nameMatch else None | |
return name | |
def workflow_DeviceMessage(msg): # v1 - Set workflow messages appropriately; '<>' is replaced with device IP or list | |
singleDeviceMsg = manyDevicesMsg = msg | |
if '<>' in msg: | |
devicesListStr = emc_vars['devices'][1:-1] | |
singleDeviceMsg = msg.replace('<>', emc_vars['deviceIP']).replace('(s)', '').replace('(es)', '') | |
if len(devicesListStr.split(',')) > 1: | |
manyDevicesMsg = msg.replace('<>', devicesListStr).replace('(s)', 's').replace('(es)', 'es') | |
else: | |
manyDevicesMsg = singleDeviceMsg | |
emc_results.put("deviceMessage", singleDeviceMsg) | |
emc_results.put("activityMessage", manyDevicesMsg) | |
emc_results.put("workflowMessage", manyDevicesMsg) | |
# | |
# Family functions | |
# v2 | |
Family = None # This needs to get set by setFamily() | |
FamilyChildren = { # Children will be rolled into parent family for these scripts | |
'Extreme Access Series' : 'VSP Series', | |
'Unified Switching VOSS': 'VSP Series', | |
'Unified Switching EXOS': 'Summit Series', | |
'Universal Platform VOSS': 'VSP Series', | |
'Universal Platform EXOS': 'Summit Series', | |
'Universal Platform Fabric Engine': 'VSP Series', | |
'Universal Platform Switch Engine': 'Summit Series', | |
} | |
def setFamily(cliDict={}, family=None): # v2 - Set global Family variable; automatically handles family children, as far as this script is concerned | |
global Family | |
if family: | |
Family = family | |
elif emc_vars["family"] in FamilyChildren: | |
Family = FamilyChildren[emc_vars["family"]] | |
else: | |
Family = emc_vars["family"] | |
print "Using family type '{}' for this script".format(Family) | |
if cliDict and Family not in cliDict: | |
exitError('This scripts only supports family types: {}'.format(", ".join(list(cliDict.keys())))) | |
return Family | |
# | |
# CLI functions | |
# v22 | |
import re | |
RegexPrompt = re.compile('.*[\?\$%#>]\s?$') | |
RegexError = re.compile( | |
'^%|\x07|error|invalid|cannot|unable|bad|not found|not exist|not allowed|no such|out of range|incomplete|failed|denied|can\'t|ambiguous|do not|unrecognized', | |
re.IGNORECASE | re.MULTILINE | |
) | |
RegexNoError = re.compile( # Messages which would be false positives for an error condition, when they are just a warning.. | |
'Both ends of MACsec link cannot have the same key-parity value', | |
re.IGNORECASE | re.MULTILINE | |
) | |
RegexContextPatterns = { # Ported from acli.pl | |
'ERS Series' : [ | |
re.compile('^(?:interface |router \w+$|route-map (?:\"[\w\d\s\.\+-]+\"|[\w\d\.-]+) \d+$|ip igmp profile \d+$|wireless|application|ipv6 dhcp guard policy |ipv6 nd raguard policy )'), # level0 | |
re.compile('^(?:security|crypto|ap-profile |captive-portal |network-profile |radio-profile )'), # level1 | |
re.compile('^(?:locale)'), # level2 | |
], | |
'VSP Series' : [ | |
re.compile('^ *(?:interface |router \w+$|router vrf|route-map (?:\"[\w\d\s\.\+-]+\"|[\w\d\.-]+) \d+$|application|i-sid \d+|wireless|logical-intf isis \d+|mgmt (?:\d|clip|vlan|oob)|ovsdb$)'), # level0 | |
re.compile('^ *(?:route-map (?:\"[\w\d\s\.\+-]+\"|[\w\d\.-]+) \d+$)'), # level1 | |
], | |
} | |
RegexExitInstance = re.compile('^ *(?:exit|back|end|config|save)(?:\s|$)') | |
Indent = 3 # Number of space characters for each indentation | |
LastError = None | |
ConfigHistory = [] | |
def cliError(outputStr): # v1 - Check command output for CLI error message | |
if not RegexNoError.search(outputStr) and RegexError.search(outputStr): | |
return True | |
else: | |
return False | |
def cleanOutput(outputStr): # v3 - Remove echoed command and final prompt from output | |
if cliError(outputStr): # Case where emc_cli.send timesout: "Error: session exceeded timeout: 30 secs" | |
return outputStr | |
lastLine = outputStr.splitlines()[-1:][0] | |
if RegexPrompt.match(lastLine): | |
lines = outputStr.splitlines()[1:-1] | |
else: | |
lines = outputStr.splitlines()[1:] | |
return '\n'.join(lines) | |
def configChain(chainStr): # v2 - Produces a list of a set of concatenated commands (either with ';' or newlines) | |
chainStr = re.sub(r'\n(\w)(\x0d?\n|\s*;|$)', chr(0) + r'\1\2', chainStr) # Mask trailing "\ny" or "\nn" on commands before making list | |
# Checking for \x0d? is necessary when DOS text files are transferred to XIQ-SE, and read and processed locally.. | |
cmdList = map(str.strip, re.split(r'[;\n]', chainStr)) | |
cmdList = filter(None, cmdList) # Filter out empty lines, if any | |
cmdList = [re.sub(r'\x00(\w)(\x0d?\n|$)', r'\n\1\2', x) for x in cmdList] # Unmask after list made | |
return cmdList | |
def parseRegexInput(cmdRegexStr): # v1 - Parses input command regex for both sendCLI_showRegex() and xmcLinuxCommand() | |
# cmdRegexStr format: <type>://<cli-show-command> [& <additional-show-cmd>]||<regex-to-capture-with> | |
if re.match(r'\w+(?:-\w+)?://', cmdRegexStr): | |
mode, cmdRegexStr = map(str.strip, cmdRegexStr.split('://', 1)) | |
else: | |
mode = None | |
cmd, regex = map(str.strip, cmdRegexStr.split('||', 1)) | |
cmdList = map(str.strip, cmd.split('&')) | |
return mode, cmdList, regex | |
def formatOutputData(data, mode): # v3 - Formats output data for both sendCLI_showRegex() and xmcLinuxCommand() | |
if not mode : value = data # Legacy behaviour same as list | |
elif mode == 'bool' : value = bool(data) # No regex capturing brackets required | |
elif mode == 'str' : value = str(data[0]) if data else None # Regex should have 1 capturing bracket at most | |
elif mode == 'str-lower' : value = str(data[0]).lower() if data else None # Same as str but string made all lowercase | |
elif mode == 'str-upper' : value = str(data[0]).upper() if data else None # Same as str but string made all uppercase | |
elif mode == 'str-join' : value = ''.join(data) # Regex with max 1 capturing bracket, joins list to string | |
elif mode == 'str-nwlnjoin' : value = "\n".join(data) # Regex with max 1 capturing bracket, joins list to multi-line string | |
elif mode == 'int' : value = int(data[0]) if data else None # Regex must have 1 capturing bracket at most | |
elif mode == 'list' : value = data # If > 1 capturing brackets, will be list of tuples | |
elif mode == 'list-reverse' : value = list(reversed(data)) # Same as list but in reverse order | |
elif mode == 'list-diagonal': value = [data[x][x] for x in range(len(data))] # Regex pat1|pat2 = list of tuples; want [0][0],[1][1],etc | |
elif mode == 'tuple' : value = data[0] if data else () # Regex > 1 capturing brackets, returns 1st tuple | |
elif mode == 'dict' : value = dict(data) # Regex must have 2 capturing brackets exactly | |
elif mode == 'dict-reverse' : value = dict(map(reversed, data)) # Same as dict, but key/values will be flipped | |
elif mode == 'dict-both' : value = dict(data), dict(map(reversed, data)) # Returns 2 dict: dict + dict-reverse | |
elif mode == 'dict-diagonal': value = dict((data[x][x*2],data[x][x*2+1]) for x in range(len(data))) # {[0][0]:[0][1], [1][2]:[1][3], etc} | |
elif mode == 'dict-sequence': value = dict((data[x*2][0],data[x*2+1][1]) for x in range(len(data)/2)) # {[0][0]:[1][1], [2][0]:[3][1], etc} | |
else: | |
RuntimeError("formatOutputData: invalid scheme type '{}'".format(mode)) | |
return value | |
def sendCLI_showCommand(cmd, returnCliError=False, msgOnError=None): # v2 - Send a CLI show command; return output | |
global LastError | |
resultObj = emc_cli.send(cmd) | |
if resultObj.isSuccess(): | |
outputStr = cleanOutput(resultObj.getOutput()) | |
if outputStr and cliError("\n".join(outputStr.split("\n")[:4])): # If there is output, check for error in 1st 4 lines only (timestamp banner might shift it by 3 lines) | |
if returnCliError: # If we asked to return upon CLI error, then the error message will be held in LastError | |
LastError = outputStr | |
if msgOnError: | |
print "==> Ignoring above error: {}\n\n".format(msgOnError) | |
return None | |
abortError(cmd, outputStr) | |
LastError = None | |
return outputStr | |
else: | |
exitError(resultObj.getError()) | |
def sendCLI_showRegex(cmdRegexStr, debugKey=None, returnCliError=False, msgOnError=None): # v1 - Send show command and extract values from output using regex | |
# Regex is by default case-sensitive; for case-insensitive include (?i) at beginning of regex on input string | |
mode, cmdList, regex = parseRegexInput(cmdRegexStr) | |
for cmd in cmdList: | |
# If cmdList we try each command in turn until one works; we don't want to bomb out on cmds before the last one in the list | |
ignoreCliError = True if len(cmdList) > 1 and cmd != cmdList[-1] else returnCliError | |
outputStr = sendCLI_showCommand(cmd, ignoreCliError, msgOnError) | |
if outputStr: | |
break | |
if not outputStr: # returnCliError true | |
return None | |
data = re.findall(regex, outputStr, re.MULTILINE) | |
debug("sendCLI_showRegex() raw data = {}".format(data)) | |
# Format we return data in depends on what '<type>://' was pre-pended to the cmd & regex | |
value = formatOutputData(data, mode) | |
if Debug: | |
if debugKey: debug("{} = {}".format(debugKey, value)) | |
else: debug("sendCLI_showRegex OUT = {}".format(value)) | |
return value | |
def sendCLI_configCommand(cmd, returnCliError=False, msgOnError=None, waitForPrompt=True): # v3 - Send a CLI config command | |
global LastError | |
cmdStore = re.sub(r'\n.+$', '', cmd) # Strip added CR+y or similar | |
if Sanity: | |
print "SANITY> {}".format(cmd) | |
ConfigHistory.append(cmdStore) | |
LastError = None | |
return True | |
resultObj = emc_cli.send(cmd, waitForPrompt) | |
if resultObj.isSuccess(): | |
outputStr = cleanOutput(resultObj.getOutput()) | |
if outputStr and cliError("\n".join(outputStr.split("\n")[:4])): # If there is output, check for error in 1st 4 lines only | |
if returnCliError: # If we asked to return upon CLI error, then the error message will be held in LastError | |
LastError = outputStr | |
if msgOnError: | |
print "==> Ignoring above error: {}\n\n".format(msgOnError) | |
return False | |
abortError(cmd, outputStr) | |
ConfigHistory.append(cmdStore) | |
LastError = None | |
return True | |
else: | |
exitError(resultObj.getError()) | |
def sendCLI_configChain(chainStr, returnCliError=False, msgOnError=None, waitForPrompt=True, abortOnError=True): # v4 - Send a list of config commands | |
# Syntax: chainStr can be a multi-line string where individual commands are on new lines or separated by the semi-colon ";" character | |
# Some embedded directive commands are allowed, these must always begin with the hash "#" character: | |
# #error fail : If a subsequent command generates an error, make the entire script fail | |
# #error stop : If a subsequent command generates an error, do not fail the script but stop processing firther commands | |
# #error continue : If a subsequent command generates an error, ignore it and continue executing remaining commands | |
cmdList = configChain(chainStr) | |
successStatus = True | |
for cmd in cmdList[:-1]: # All but last | |
embedded = re.match(r'^#error +(fail|stop|continue) *$', cmd) | |
if embedded: | |
errorMode = embedded.group(1) | |
returnCliError = False if errorMode == 'fail' else True | |
abortOnError = True if errorMode == 'stop' else False | |
continue # After setting the above, we skip the embedded command | |
success = sendCLI_configCommand(cmd, returnCliError, msgOnError) | |
if not success: | |
successStatus = False | |
if abortOnError: | |
return False | |
# Last now | |
success = sendCLI_configCommand(cmdList[-1], returnCliError, msgOnError, waitForPrompt) | |
if not success: | |
return False | |
return successStatus | |
def printConfigSummary(): # v3 - Print summary of all config commands executed with context indentation | |
global ConfigHistory | |
emc_cli.close() | |
if not len(ConfigHistory): | |
print "No configuration was performed" | |
return | |
print "The following configuration was successfully performed on switch:" | |
indent = '' | |
level = 0 | |
if Family in RegexContextPatterns: | |
maxLevel = len(RegexContextPatterns[Family]) | |
for cmd in ConfigHistory: | |
if Family in RegexContextPatterns: | |
if RegexContextPatterns[Family][level].match(cmd): | |
print "-> {}{}".format(indent, cmd) | |
if level + 1 < maxLevel: | |
level += 1 | |
indent = ' ' * Indent * level | |
continue | |
elif RegexExitInstance.match(cmd): | |
if level > 0: | |
level -= 1 | |
indent = ' ' * Indent * level | |
print "-> {}{}".format(indent, cmd) | |
ConfigHistory = [] | |
# | |
# XMC GraphQl NBI functions | |
# v11 | |
from java.util import LinkedHashMap # Used by nbiQuery | |
LastNbiError = None | |
NbiUrl = None | |
def recursionKeySearch(nestedDict, returnKey): # v1 - Used by both nbiQuery() and nbiMutation() | |
for key, value in nestedDict.iteritems(): | |
if key == returnKey: | |
return True, value | |
for key, value in nestedDict.iteritems(): | |
if isinstance(value, (dict, LinkedHashMap)): # XMC Python is Jython where a dict is in fact a java.util.LinkedHashMap | |
foundKey, foundValue = recursionKeySearch(value, returnKey) | |
if foundKey: | |
return True, foundValue | |
return [None, None] # If we find nothing | |
def nbiQuery(jsonQueryDict, debugKey=None, returnKeyError=False, **kwargs): # v5 - Makes a GraphQl query of XMC NBI; if returnKey provided returns that key value, else return whole response | |
global LastNbiError | |
jsonQuery = jsonQueryDict['json'] | |
for key in kwargs: | |
jsonQuery = jsonQuery.replace('<'+key+'>', kwargs[key]) | |
returnKey = jsonQueryDict['key'] if 'key' in jsonQueryDict else None | |
response = nbiSessionPost(jsonQuery, returnKeyError) if NbiUrl else emc_nbi.query(jsonQuery) | |
debug("nbiQuery response = {}".format(response)) | |
if response == None: # Should only happen from nbiSessionPost if returnKeyError=True | |
return None | |
if 'errors' in response: # Query response contains errors | |
if returnKeyError: # If we asked to return upon NBI error, then the error message will be held in LastNbiError | |
LastNbiError = response['errors'][0].message | |
return None | |
abortError("nbiQuery for\n{}".format(jsonQuery), response['errors'][0].message) | |
LastNbiError = None | |
if returnKey: # If a specific key requested, we find it | |
foundKey, returnValue = recursionKeySearch(response, returnKey) | |
if foundKey: | |
if Debug: | |
if debugKey: debug("{} = {}".format(debugKey, returnValue)) | |
else: debug("nbiQuery {} = {}".format(returnKey, returnValue)) | |
return returnValue | |
if returnKeyError: | |
return None | |
# If requested key not found, raise error | |
abortError("nbiQuery for\n{}".format(jsonQuery), 'Key "{}" was not found in query response'.format(returnKey)) | |
# Else, return the full response | |
if Debug: | |
if debugKey: debug("{} = {}".format(debugKey, response)) | |
else: debug("nbiQuery response = {}".format(response)) | |
return response | |
def nbiMutation(jsonQueryDict, returnKeyError=False, debugKey=None, **kwargs): # v5 - Makes a GraphQl mutation query of XMC NBI; returns true on success | |
global LastNbiError | |
jsonQuery = jsonQueryDict['json'] | |
for key in kwargs: | |
jsonQuery = jsonQuery.replace('<'+key+'>', kwargs[key]) | |
returnKey = jsonQueryDict['key'] if 'key' in jsonQueryDict else None | |
if Sanity: | |
print "SANITY - NBI Mutation:\n{}\n".format(jsonQuery) | |
LastNbiError = None | |
return True | |
print "NBI Mutation Query:\n{}\n".format(jsonQuery) | |
response = nbiSessionPost(jsonQuery, returnKeyError) if NbiUrl else emc_nbi.query(jsonQuery) | |
debug("nbiQuery response = {}".format(response)) | |
if 'errors' in response: # Query response contains errors | |
if returnKeyError: # If we asked to return upon NBI error, then the error message will be held in LastNbiError | |
LastNbiError = response['errors'][0].message | |
return None | |
abortError("nbiQuery for\n{}".format(jsonQuery), response['errors'][0].message) | |
def recursionStatusSearch(nestedDict): | |
for key, value in nestedDict.iteritems(): | |
if key == 'status': | |
if 'message' in nestedDict: | |
return True, value, nestedDict['message'] | |
else: | |
return True, value, None | |
for key, value in nestedDict.iteritems(): | |
if isinstance(value, (dict, LinkedHashMap)): # XMC Python is Jython where a dict is in fact a java.util.LinkedHashMap | |
foundKey, foundValue, foundMsg = recursionStatusSearch(value) | |
if foundKey: | |
return True, foundValue, foundMsg | |
return [None, None, None] # If we find nothing | |
foundKey, returnStatus, returnMessage = recursionStatusSearch(response) | |
if foundKey: | |
debug("nbiMutation status = {} / message = {}".format(returnStatus, returnMessage)) | |
elif not returnKeyError: | |
# If status key not found, raise error | |
abortError("nbiMutation for\n{}".format(jsonQuery), 'Key "status" was not found in query response') | |
if returnStatus == "SUCCESS": | |
LastNbiError = None | |
if returnKey: # If a specific key requested, we find it | |
foundKey, returnValue = recursionKeySearch(response, returnKey) | |
if foundKey: | |
if Debug: | |
if debugKey: debug("{} = {}".format(debugKey, returnValue)) | |
else: debug("nbiQuery {} = {}".format(returnKey, returnValue)) | |
return returnValue | |
if returnKeyError: | |
return None | |
# If requested key not found, raise error | |
abortError("nbiMutation for\n{}".format(jsonQuery), 'Key "{}" was not found in mutation response'.format(returnKey)) | |
return True | |
else: | |
LastNbiError = returnMessage | |
return False | |
# | |
# Port processing functions | |
# v5 | |
RegexPort = re.compile('^(?:[1-9]\d{0,2}[/:])?\d+(?:[/:]\d)?$') | |
RegexPortRange = re.compile('^(?:([1-9]\d{0,2})([/:]))?(\d+)(?:[/:](\d))?-(?:([1-9]\d{0,2})[/:])?(\d+)(?:[/:](\d))?$') | |
RegexStarRange = re.compile('^([1-9]\d{0,2})(:)\*$') # XOS only | |
SlotPortRange = None # Gets set to dict by getSlotPortRanges() | |
def portValue(port): # v1 - Function to pass to sorted(key) to sort port lists | |
slotPort = re.split('[/:]', port) | |
if len(slotPort) == 3: # slot/port/chan format | |
idx = int(slotPort[0])*400 + int(slotPort[1])*4 + int(slotPort[2]) | |
elif len(slotPort) == 2: # slot/port format | |
idx = int(slotPort[0])*400 + int(slotPort[1])*4 | |
else: # standalone port (no slot) | |
idx = int(slotPort[0])*4 | |
return idx | |
def getSlotPortRanges(): # v1 - Populates the SlotPortRange dict | |
global SlotPortRange | |
slotCommand = {'Summit Series': 'dict://show slot||^Slot-(\d+) +\S+ +\S+ +\S+ +(\d+)'} # Only XOS supported | |
if Family not in slotCommand: | |
SlotPortRange = {} | |
return | |
SlotPortRange = sendCLI_showRegex(slotCommand[Family]) | |
debug("getSlotPortRanges = {}".format(SlotPortRange)) | |
def generatePortList(portStr, debugKey=None): # v2 - Given a port list/range, validates it and returns an ordered port list with no duplicates (can also be used for VLAN-id ranges) | |
# This version of this function will not handle port ranges which span slots | |
debug("generatePortList IN = {}".format(portStr)) | |
portDict = {} # Use a dict, will ensure no port duplicate keys | |
for port in portStr.split(','): | |
port = re.sub(r'^[\s\(]+', '', port) # Remove leading spaces [ or '(' ] | |
port = re.sub(r'[\s\)]+$', '', port) # Remove trailing spaces [ or ')' => XMC bug on ERS standalone units] | |
if not len(port): # Skip empty string | |
continue | |
rangeMatch = RegexPortRange.match(port) | |
starMatch = RegexStarRange.match(port) | |
if rangeMatch: # We have a range of ports | |
startSlot = rangeMatch.group(1) | |
separator = rangeMatch.group(2) | |
startPort = int(rangeMatch.group(3)) | |
startChan = int(rangeMatch.group(4)) | |
endSlot = rangeMatch.group(5) | |
endPort = int(rangeMatch.group(6)) | |
endChan = int(rangeMatch.group(7)) | |
if endSlot and startSlot != endSlot: | |
print "ERROR! generatePortList no support for ranges spanning slots: {}".format(port) | |
elif (startChan or endChan) and endPort and startPort != endPort: | |
print "ERROR! generatePortList no support for ranges spanning channelized ports: {}".format(port) | |
elif not (startChan or endChan) and startPort >= endPort: | |
print "ERROR! generatePortList invalid range: {}".format(port) | |
elif (startChan or endChan) and startChan >= endChan: | |
print "ERROR! generatePortList invalid range: {}".format(port) | |
else: # We are good | |
if startChan: | |
for portCount in range(startChan, endChan + 1): | |
portDict[startSlot + separator + str(startPort) + separator + str(portCount)] = 1 | |
else: | |
for portCount in range(startPort, endPort + 1): | |
if startSlot: # slot-based range | |
portDict[startSlot + separator + str(portCount)] = 1 | |
else: # simple port range (no slot info) | |
portDict[str(portCount)] = 1 | |
elif starMatch: # We have a slot/* range | |
slot = starMatch.group(1) | |
separator = starMatch.group(2) | |
if SlotPortRange == None: # Structure not populated | |
getSlotPortRanges() | |
if SlotPortRange: | |
if slot in SlotPortRange: | |
for portCount in range(1, int(SlotPortRange[slot]) + 1): | |
portDict[slot + separator + str(portCount)] = 1 | |
else: | |
print "Warning: no range for slot {}; skipping: {}".format(slot, port) | |
else: | |
print "Warning: generatePortList skipping star range as not supported on this switch type: {}".format(port) | |
elif RegexPort.match(port): # Port is in valid format | |
portDict[port] = 1 | |
else: # Port is in an invalid format; don't add to dict, print an error message, don't raise exception | |
print "Warning: generatePortList skipping unexpected port format: {}".format(port) | |
# Sort and return the list as a comma separated string | |
portList = sorted(portDict, key=portValue) | |
if Debug: | |
if debugKey: debug("{} = {}".format(debugKey, portList)) | |
else: debug("generatePortList OUT = {}".format(portList)) | |
return portList | |
def generatePortRange(portList, debugKey=None): # v2 - Given a list of ports, generates a compacted port list/range string for use on CLI commands | |
# Ported from acli.pl; this version of this function only compacts ranges within same slot | |
debug("generatePortRange IN = {}".format(portList)) | |
rangeMode = {'VSP Series': 2, 'ERS Series': 1, 'Summit Series': 1} | |
elementList = [] | |
elementBuild = None | |
currentType = None | |
currentSlot = None | |
currentPort = None | |
currentChan = None | |
rangeLast = None | |
# First off, sort the list | |
portList = sorted(portList, key=portValue) | |
for port in portList: | |
slotPort = re.split("([/:])", port) # Split on '/' (ERS/VSP) or ':'(XOS) | |
# slotPort[0] = slot / slotPort[1] = separator ('/' or ':') / slotPort[2] = port / slotPort[4] = channel | |
if len(slotPort) == 5: # slot/port/chan | |
if elementBuild: | |
if currentType == 's/p' and slotPort[0] == currentSlot and slotPort[2] == currentPort and slotPort[4] == str(int(currentChan)+1): | |
currentChan = slotPort[4] | |
if rangeMode[Family] == 1: | |
rangeLast = currentChan | |
else: # rangeMode = 2 | |
rangeLast = currentSlot + slotPort[1] + currentPort + slotPort[1] + currentChan | |
continue | |
else: # Range complete | |
if rangeLast: | |
elementBuild += '-' + rangeLast | |
elementList.append(elementBuild) | |
elementBuild = None | |
rangeLast = None | |
# Fall through below | |
currentType = 's/p' | |
currentSlot = slotPort[0] | |
currentPort = slotPort[2] | |
currentChan = slotPort[4] | |
elementBuild = port | |
if len(slotPort) == 3: # slot/port | |
if elementBuild: | |
if currentType == 's/p' and slotPort[0] == currentSlot and slotPort[2] == str(int(currentPort)+1) and not currentChan: | |
currentPort = slotPort[2] | |
if rangeMode[Family] == 1: | |
rangeLast = currentPort | |
else: # rangeMode = 2 | |
rangeLast = currentSlot + slotPort[1] + currentPort | |
continue | |
else: # Range complete | |
if rangeLast: | |
elementBuild += '-' + rangeLast | |
elementList.append(elementBuild) | |
elementBuild = None | |
rangeLast = None | |
# Fall through below | |
currentType = 's/p' | |
currentSlot = slotPort[0] | |
currentPort = slotPort[2] | |
currentChan = None | |
elementBuild = port | |
if len(slotPort) == 1: # simple port (no slot) | |
if elementBuild: | |
if currentType == 'p' and port == str(int(currentPort)+1): | |
currentPort = port | |
rangeLast = currentPort | |
continue | |
else: # Range complete | |
if rangeLast: | |
elementBuild += '-' + rangeLast | |
elementList.append(elementBuild) | |
elementBuild = None | |
rangeLast = None | |
# Fall through below | |
currentType = 'p' | |
currentPort = port | |
elementBuild = port | |
if elementBuild: # Close off last element we were holding | |
if rangeLast: | |
elementBuild += '-' + rangeLast | |
elementList.append(elementBuild) | |
portStr = ','.join(elementList) | |
if Debug: | |
if debugKey: debug("{} = {}".format(debugKey, portStr)) | |
else: debug("generatePortRange OUT = {}".format(portStr)) | |
return portStr | |
# | |
# INIT: Init Debug & Sanity flags based on input combos | |
# | |
try: | |
if emc_vars['userInput_sanity'].lower() == 'enable': | |
Sanity = True | |
elif emc_vars['userInput_sanity'].lower() == 'disable': | |
Sanity = False | |
except: | |
pass | |
try: | |
if emc_vars['userInput_debug'].lower() == 'enable': | |
Debug = True | |
elif emc_vars['userInput_debug'].lower() == 'disable': | |
Debug = False | |
except: | |
pass | |
# --> Insert Ludo Threads library here if required <-- | |
# --> XMC Python script actually starts here <-- | |
# | |
# Imports: | |
# | |
# | |
# Variables: | |
# | |
CLI_Dict = { | |
'Cisco': { | |
}, | |
'VSP Series': { | |
'disable_more_paging' : 'terminal more disable', | |
'enable_context' : 'enable', | |
'config_context' : 'config term', | |
'exit_config_context' : 'exit', | |
'end_config' : 'end', | |
'save_config' : 'save config', | |
}, | |
'Summit Series': { | |
'disable_more_paging' : 'disable cli paging', | |
'save_config' : 'save configuration', | |
}, | |
'ERS Series': { | |
'disable_more_paging' : 'terminal length 0', | |
'enable_context' : 'enable', | |
'config_context' : 'config term', | |
'end_config' : 'end', | |
'save_config' : 'copy config nvram', | |
}, | |
} | |
# | |
# Functions: | |
# | |
def main(): | |
print "Configure banner for ERS / EXOS / VOSS version {}".format(__version__) | |
setFamily() # Sets global Family variable | |
exos_username = emc_vars["userInput_exos_username"].strip() | |
exos_oldPassword = emc_vars["userInput_exos_password_old"].strip() | |
exos_newPassword = emc_vars["userInput_exos_password_new"].strip() | |
voss_level = emc_vars["userInput_voss_level"].strip() | |
voss_username = emc_vars["userInput_voss_username"].strip() | |
voss_oldPassword = emc_vars["userInput_voss_password_old"].strip() | |
voss_newPassword = emc_vars["userInput_voss_password_new"].strip() | |
print "#################################################" | |
print "Information account EXOS" | |
print " - Username : {}".format(exos_username) | |
print " - Old password : {}".format(exos_oldPassword) | |
print " - New password : {}".format(exos_newPassword) | |
print "#################################################" | |
print "Information account VOSS" | |
print " - Level : {}".format(voss_level) | |
print " - Username : {}".format(voss_username) | |
print " - Old password : {}".format(voss_oldPassword) | |
print " - New password : {}".format(voss_newPassword) | |
print "#################################################" | |
if 'enable_context' in CLI_Dict[Family]: | |
sendCLI_configCommand(CLI_Dict[Family]['enable_context']) | |
if 'config_context' in CLI_Dict[Family]: | |
sendCLI_configCommand(CLI_Dict[Family]['config_context']) | |
if Family == 'Summit Series': | |
sendCLI_configCommand("configure account " + exos_username + " password", False, None, False) | |
sendCLI_configCommand(exos_oldPassword, False, None, False) | |
sendCLI_configCommand(exos_newPassword, False, None, False) | |
sendCLI_configCommand(exos_newPassword, False, None, False) | |
if Family == 'VSP Series': | |
sendCLI_configCommand("cli password " + voss_username + " " + voss_level, False, None, False) | |
sendCLI_configCommand(voss_oldPassword, False, None, False) | |
sendCLI_configCommand(voss_newPassword, False, None, False) | |
sendCLI_configCommand(voss_newPassword, False, None, False) | |
if 'end_config' in CLI_Dict[Family]: | |
sendCLI_configCommand(CLI_Dict[Family]['end_config']) | |
sendCLI_configCommand(CLI_Dict[Family]['save_config']) | |
# Print summary of config performed | |
printConfigSummary() | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment