Skip to content

Instantly share code, notes, and snippets.

@VLogin
Created July 3, 2022 11:36
Show Gist options
  • Save VLogin/1461a6a10ffad9e57e9434918a79770c to your computer and use it in GitHub Desktop.
Save VLogin/1461a6a10ffad9e57e9434918a79770c to your computer and use it in GitHub Desktop.
[vcofrmnlib.py] #python #vro #vco
#!/bin/env python
"""
Python library for calling vCO workflows.
Author: AV
Date: 2018-07-11
Changed 2019-03-18
VMWare has changed format of log timestamps.
Former notation has been following:
time-stamp: UUUUUUUUUUMMM
here U part - seconds from unix epoch, and M part - milliseconds
After vCO 7.5 time-stamp arrives in ISO8601 format:
YYYY-MM-DDTHH:II:SSZ
"""
from __future__ import print_function
import os, sys
import pycurl
import getopt
import json
try:
from io import BytesIO
except ImportError:
from StringIO import StringIO as BytesIO
import re
riso8601 = r'^(-?(?:[1-9][0-9]*)?[0-9]{4})-(1[0-2]|0[1-9])-(3[01]|0[1-9]|[12][0-9])T(2[0-3]|[01][0-9]):([0-5][0-9]):([0-5][0-9])(\.[0-9]+)?(Z|[+-](?:2[0-3]|[01][0-9]):[0-6][0-9])?$'
miso8601 = re.compile( riso8601 ).match
rTZending = r'([+-])(2[0-3]|[01][0-9]):?([0-6][0-9])$'
mTZending = re.compile( rTZending ).search
headers = {}
pycurlRetStatusCode = 0
MAX_ATTEMPTS = 10
POLL_INTERVAL = 15
isVerbose = False
isDebug = False
doTrackLogs = False
def setVerbose( value ):
global isVerbose
isVerbose = value
def setPollInterval( value ):
global POLL_INTERVAL
try:
value = int( value )
except:
value = 5
if value < 5:
value = 5
POLL_INTERVAL = value
def setMaxAttempts( value ):
global MAX_ATTEMPTS
try:
value = int( value )
except:
value = 30
MAX_ATTEMPTS = value
def setTrackLogs( value ):
global doTrackLogs
doTrackLogs = value
def setDebug( value ):
global isDebug
isDebug = value
### helper error print function
def errprint( *args, **kwargs ):
print( *args, file=sys.stderr, **kwargs )
### backend pycurl code
def header_function( header_line ):
"""
Call back function used to parse and keep HTTP headers for pycurl request.
Non-thread safe, as relies on following global variables
headers,
pycurlRetStatusCode
"""
header_line = header_line.decode( 'iso-8859-1' )
# the pycurl.getinfo( pycurl.RESPONSE_CODE ) seems to be
# inconsistent, therefore we rely on headers parse to grab
# HTTP status code
global pycurlRetStatusCode
if ( header_line.startswith( 'HTTP' ) ):
hLine = header_line.strip()
import re
m = re.match('HTTP/(?:\d+)(?:\.\d+)?\s+(\d+)', hLine )
if m:
pycurlRetStatusCode = int( m.group(1) )
if ':' not in header_line:
return
# Break the header line into header name and value,
# make header lowercase and so on
name, value = header_line.split(':', 1)
name = name.strip()
value = value.strip()
name = name.lower()
headers[name] = value
def getContentsByUrl( url, method = 'GET', user = None, passw=None, \
postdata = None, \
reqHdrs = ['Content-Type: application/json','Accept-Charset: UTF-8'], \
timeout = None ):
"""
Wrapper for getting contents via pycurl request.
Non-thread safe due to usage of global variables.
Don't fiddle with headers and pycurlRetStatusCode until request is completed.
"""
response = BytesIO()
global headers
global pycurlRetStatusCode
global isDebug
headers = {}
pycurlRetStatusCode = 0
c = pycurl.Curl()
cURLurl = url
majorVersion = sys.version_info[ 0 ]
if majorVersion < 3:
cURLurl = url.encode( 'utf-8' )
c.setopt( c.URL, str( cURLurl ) )
c.setopt( c.WRITEFUNCTION, response.write )
c.setopt( c.HEADERFUNCTION, header_function )
if ( method == 'GET' ):
c.setopt( c.HTTPGET, True )
elif ( method == 'POST' ):
c.setopt( c.POST, True )
else:
raise Exception( 'Wrong HTTP method has been selected, only GET and POST should be used' )
c.setopt( c.VERBOSE, isDebug )
if not user is None and not passw is None:
c.setopt( c.HTTPAUTH, c.HTTPAUTH_BASIC )
if not user is None and not passw is None:
c.setopt( c.USERPWD, user + ':' + passw )
if ( not reqHdrs is None and type( reqHdrs ) is list ):
c.setopt( c.HTTPHEADER, reqHdrs )
if ( not postdata is None ):
pData = postdata
if majorVersion < 3:
pData = str( postdata.encode( 'utf-8' ) )
pDataLen = len( pData )
c.setopt( c.POSTFIELDS, pData )
c.setopt( c.POSTFIELDSIZE, pDataLen )
# Sometime certificates might be expired for different reasons.
# On practice it seems to be completely useless to block execution
# while other teams fix certificate issues, therefore we turn off
# ceritificate verifications
c.setopt( c.SSL_VERIFYHOST, 0 )
c.setopt( c.SSL_VERIFYPEER, 0 )
if not timeout is None:
c.setopt( c.CONNECTTIMEOUT, timeout )
httpStatus = 0
try:
c.perform()
httpStatus = pycurlRetStatusCode
except pycurl.error as error:
errno, errstr = error.args
print ("Failed to fetch data for URL: %s" % ( url ) )
print ("Error code: %s, message: %s" % ( errno, errstr ) )
c.close()
httpStatus = pycurlRetStatusCode
body = response.getvalue()
response.close()
return ( headers, body, httpStatus )
### helper functions
def getUserConfigFileName():
scriptName = os.path.basename(sys.argv[0])
fName = '.' + scriptName
if fName.endswith( ".py" ):
fName = fName[:-3]
fName = fName + "rc"
return fName
def getDefaultVCOCredentials():
"""
To alleviate problems with passing credentials in command line
one can use $HOME/.<script>rc file to keep these there.
This function merely checks the existence if this file
and tries to read credentials in case it exists.
"""
userHome = ''
userPass = ''
userName = ''
if 'VCOUSER' in os.environ.keys() and 'VCOPASS' in os.environ.keys():
return [ os.environ['VCOUSER'], os.environ['VCOPASS'] ]
if 'HOME' in os.environ.keys():
userHome = os.environ['HOME']
if userHome > '':
if not userHome.endswith( '/' ):
userHome = userHome + '/'
fname = userHome + getUserConfigFileName()
if os.path.isfile( fname ):
with open( fname ) as f:
for line in f:
l = line.rstrip( '\n' )
if l.lower().startswith( 'vcopass=' ):
[ dummy, userPass ] = l.split( '=', 1 )
if l.lower().startswith( 'vcouser=' ):
[ dummy, userName ] = l.split( '=', 1 )
return [ userName, userPass ]
### validators
def sanitizeVCOProvider( vcoprovider ):
if not vcoprovider.startswith( 'https://' ):
vcoprovider = 'https://' + vcoprovider
if vcoprovider.endswith( '/' ):
import re
vcoprovider = re.sub( '/+$', '', vcoprovider )
return vcoprovider
### Service-level functions for resolving workflow name to workflow id
def getWorkflowID( server, wfName, user, passw ):
"""
Basically this is a wrapper which tries to resolve workflow name
to the workflow id.
The ArchOps team has internal agreement regarding which we should
obtain the latest version of workflow in case there are several workflows
matching the name.
As condition is "like" instead of strict comparison, we'll sort out other
workflows with getLatestWFVersion
"""
url = '%s/vco/api/workflows?conditions=name~%s' % ( server, wfName )
( head, body, httpStatus ) = getContentsByUrl( url, 'GET', user, passw )
href = ''
id = ''
version = ''
if ( httpStatus == 200 or httpStatus == 202 ):
( href, id, version ) = getLatestWFVersion( wfName, body )
else:
raise Exception( "Can't figure out the workflowID for workflow %s as return HTTP status was %s. The response was:\n%s" % ( wfName, httpStatus, body ) )
return ( href, id, version )
def versionParse( version ):
"""
Helper function which allows to sort out versions in notation x.y.z
The x, y and z should be only numeric per internal agreement.
So no versions like 0.0.1a are allowed.
"""
if version is None or version == '':
version = '0'
return tuple( map ( int, ( version.split( '.' ) ) ) )
def getLatestWFVersion( wfName, content ):
"""
Helper function which allows to extract the workflow id
corresponding to the latest version returned by
/vco/api/workflows?conditions=name~<name>
Remember, the like can match several different workflows for example
name~method1
will return method1 and method1_extended
To avoid confusion we match workflow name and skip non-matching workflows.
"""
latestVersion='0'
latestId=''
latestHref=''
try:
data = json.loads( content )
except:
data = {}
tstWFName=''
if not data is None and 'total' in data.keys() and 'link' in data.keys():
for x in data['link']:
href=x['href']
tstWFName=''
version=''
id=''
if 'attributes' in x.keys():
for a in x['attributes']:
if 'name' in a.keys() and 'value' in a.keys():
if a['name'].lower() == 'name':
tstWFName=a['value']
continue
if a['name'].lower() == 'version':
version=a['value']
continue
if a['name'].lower() == 'id':
id=a['value']
continue
if version > '' and id > '' and ( wfName == '' or wfName.lower() == tstWFName.lower() ):
if versionParse( version ) > versionParse( latestVersion ):
latestVersion = version
latestId = id
latestHref = href
return ( latestHref, latestId, latestVersion )
### racktables helper functions
def fetchRTDataForVM( rtInstance, vmName ):
global isDebug
url = "%s/api/ads/hosts?hostnames=%s&vra=1" % ( rtInstance, vmName )
( headers, body, httpStatus ) = getContentsByUrl( url, 'GET', timeout = 10 )
result = None
if ( httpStatus == 200 ) and len( body ) > 0 :
try:
data = json.loads( body )
if isDebug:
print( json.dumps( data, sort_keys=True, indent=4 ) )
except:
raise Exception( "Unexpected result for getting RT data for host %s" % vmName )
result = next( iter( data or [] ), None )
return ( headers, httpStatus, result )
def queryHostViewFromRT( rtInstance, vmName, field ):
url = "%s/api/hosts/search?q=name~^%s$&fields=name,%s&format=json" % ( rtInstance, vmName, field )
( headers, body, httpStatus ) = getContentsByUrl( url, 'GET', timeout = 10 )
result = None
bFound = False
if ( httpStatus == 200 ) and len( body ) > 0:
try:
data = json.loads( body )
if type( data ) is list:
for x in data:
if (
not x is None and type(x) is dict
and 'name' in x.keys()
and x['name'].lower() == vmName.lower()
and field in x.keys()
):
result = x[field]
bFound = True
break
except:
raise Exception( "Unexpected result for trying to get %s value for %s" % ( field, vmName ) )
return ( headers, httpStatus, result if bFound else None )
def getSegmentValueFromRT( vmName ):
rtArr = [ 'http://rtlab.ringcentral.com', 'http://rt.ringcentral.com' ]
result = None
headers = None
httpStatus = None
for x in rtArr:
( headers, httpStatus, result ) = queryHostViewFromRT( x, vmName, 'segment' )
if not result is None:
break
return ( headers, httpStatus, result )
def getRTDataForVM( vmName ):
rtArr = [ 'http://rtlab.ringcentral.com', 'http://rt.ringcentral.com' ]
result = None
headers = None
httpStatus = None
for x in rtArr:
( headers, httpStatus, result ) = fetchRTDataForVM( x, vmName )
if not result is None:
break
return ( headers, httpStatus, result )
def getValueFromRTDataByKey( key, rtData ):
result = None
if type( rtData ) is dict:
if key in rtData.keys():
result = rtData[key]
else:
raise Exception("The rtData object is not a dict. %s" % ( json.dumps( rtData ) ) )
return result
### execute workflow of VCO
def executeVCOWorkflow( server, wfName, user, passw, postData ):
"""
Execution of workflow by name.
The first step is to resolve name to the workflow ID, then use POST call
passing the postData to the vcoprovider server.
The postData should be valid JSON.
After the POST request has been made, the procedure will poll vcoprovider server
using the traceURL returned in Location header of POST request.
It will stop it either when attempts become greater than MAX_ATTEMPTS limit
or worklfow status value becomes something different of "running" or "waiting for signal"
"""
global POLL_INTERVAL
global MAX_ATTEMPTS
global doTrackLogs
global isDebug
( href, id, version ) = getWorkflowID( server, wfName, user, passw )
wfID = ''
execID = ''
execStatus = ''
httpStatus = 0
majorVersion = sys.version_info[0]
if ( id > '' ):
wfID = id
url = '%s/vco/api/workflows/%s/executions' % ( server, wfID )
( headers, body, httpStatus ) = getContentsByUrl( url, 'POST', user, passw, postData )
execID = ''
oldLogs = {}
bufLogs = {}
if ( ( httpStatus == 200 or httpStatus == 202 ) and 'location' in headers.keys() and headers['location'] > '' ):
traceURL = headers['location']
import re
execID = os.path.basename( re.sub( '/+$', '', traceURL ) )
if isDebug:
print( "DEBUG|execitionID is %s" % ( execID ) )
if execID > '':
# to avoid port being chainged we reconstruct traceURL
traceURL = '%s/%s' % ( url, execID )
attemptNumber = 0
doBreak = False
while attemptNumber < MAX_ATTEMPTS:
( chHeaders, chBody, chHttpStatus ) = getContentsByUrl( traceURL, 'GET', user, passw )
if chBody and majorVersion >= 3:
chBody = chBody.decode( 'utf-8' )
if ( chHttpStatus == 200 or chHttpStatus == 202 ) and chBody > '':
data = json.loads( chBody )
if ( 'state' in data.keys() ):
execStatus = data['state'].lower()
if ( execStatus > '' and execStatus != 'waiting for signal' and execStatus != 'running' and execStatus != 'waiting-signal' ):
doBreak = True
import time
time.sleep( 1 )
if ( doTrackLogs ):
( wfIDLogs, execIDLogs, httpStatusLogs, resultLogs ) = getVCOWorkflowLogs( server, wfID, execID, user, passw )
if httpStatusLogs in ( 200, 202 ) and not resultLogs is None:
bufLogs = extractLogs( resultLogs )
printLogs = {}
for key in bufLogs:
if key in oldLogs.keys(): continue
printLogs[ key ] = bufLogs[ key ]
oldLogs = bufLogs
printWFLogs( printLogs )
if doBreak:
break
time.sleep( POLL_INTERVAL -1 )
attemptNumber += 1
else:
raise Exception( "The execution id for workflow id %s is empty for some reason. Please check on the vCO side" % ( wfID ) )
else:
raise Exception( "There was an error during execution workflow. Can't get URL for tracking workflow status. Check parameters" )
return ( wfID, execID, httpStatus, execStatus )
### Workflow logs handling
def getVCOWorkflowLogs( server, wfID, execID, user, passw ):
"""
Should not hurt to be able to get WF logs in case something goes wrong
"""
url = '%s/vco/api/workflows/%s/executions/%s/logs' % ( server, wfID, execID )
( headers, body, httpStatus ) = getContentsByUrl( url, 'GET', user, passw )
result = {}
if httpStatus == 200 or httpStatus == 202:
try:
data = json.loads( body )
except:
data = None
if not data is None and type( data ) is dict and "logs" in data.keys():
result = { "logs": data["logs"] }
return ( wfID, execID, httpStatus, result )
def getVMWareTimestampFromIsoDate( timestamp ):
from datetime import datetime, timedelta
isISO8601 = False
ms = ""
tmstmp = None
tzSign = ''
tzHour = -1
tzMin = -1
if miso8601( str( timestamp ) ) is not None:
isISO8601 = True
k = timestamp
if k.lower().endswith('z'):
k = k.rstrip( 'Z' )
else:
tzGrp = mTZending( k )
if ( tzGrp ):
tzSign = tzGrp.group(1)
tzHour = int( tzGrp.group(2) )
tzMin = int( tzGrp.group(3) )
k = k.replace( tzGrp.group(0), '' )
if k[-4:-3] == '.':
( k, ms ) = k.split( '.' )
try:
tmstmp = datetime.strptime( k, '%Y-%m-%dT%H:%M:%S' )
except Exception as e:
tmstmp = None
if not tmstmp is None:
if tzSign > '':
tDelta = timedelta( hours = tzHour, minutes = tzMin )
if tzSign == '+':
tmstmp = tmstmp - tDelta
elif tzSign == '-':
tmstmp = tmstmp + tDelta
t = ( tmstmp - datetime( 1970, 1, 1 ))
timestamp = ( t.seconds + t.days * 86400 ) * 1000
if ms:
timestamp = timestamp + int( ms )
return { "timestamp": str( timestamp ), "isISO8601": isISO8601, "ms": ms }
def extractLogs( logObject ):
"""
Arrange log entries returned in the vCO structure in the dictionary:
{
timestamp1: { severity: "", description: "" },
timestamp2: { severity: "", description: "" },
...
timestampN: { severity: "", description: "" },
}
"""
entries = []
tNum = 0
if not logObject is None and type( logObject ) is dict and "logs" in logObject.keys():
for x in logObject["logs"]:
entry = None
if type(x) is dict and "entry" in x.keys():
tNum = tNum + 1
entry = x["entry"]
severity = entry['severity'] if 'severity' in entry.keys() else ''
timestamp = entry['time-stamp'] if 'time-stamp' in entry.keys() else ''
tsRecord = getVMWareTimestampFromIsoDate( timestamp )
description = entry['long-description'] if 'long-description' in entry.keys() else ''
if description == '':
description = entry['short-description'] if 'short-description' in entry.keys() else ''
entries.append( { 'timestamp': tsRecord['timestamp'], 'severity': severity, 'description': description, 'tNum': tNum } )
count = len( entries )
""" We have to make consistent numbering of log records to have nice log output """
res = {}
sNum = 0
entries.sort( key = lambda d: -d['tNum'] )
for item in entries:
key = "_".join( [ str( item['timestamp'] ), str( sNum ) ] )
item['seqNumber'] = sNum
res[ key ] = item
sNum = sNum + 1
return res
def termSupportsColors():
p = sys.platform
isSupported = ( p.lower() != 'win32' or 'ANSICON' in os.environ )
isTTY = hasattr( sys.stdout, 'isatty' ) and sys.stdout.isatty()
return isSupported or isTTY
def printWFLogs( entries ):
import datetime
INFO = ''
OKGREEN = ''
WARNING = ''
FAIL = ''
ENDC = ''
if termSupportsColors():
INFO = '\033[94m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
for key in sorted( entries ):
timestamp = entries[ key ]['timestamp']
fraction = ''
if ( len( timestamp ) > 10 ):
fraction = timestamp[10:]
timestamp = timestamp[0:10]
timestamp = datetime.datetime.utcfromtimestamp( int( timestamp ) ) \
.strftime( '%Y-%m-%dT%H:%M:%S' )
bgn = ENDC
entry = entries[ key ]
severity = entries[key]['severity']
description = entries[key]['description']
if severity == "info":
if description.startswith( 'Workflow' ) and description.endswith( 'has completed'):
bgn = OKGREEN
elif severity == "warning":
bgn = WARNING
elif severity == "error":
bgn = FAIL
print( "{BGN}{timestamp}.{fraction}Z|{severity}|{description}{ENDC}".format(
timestamp = timestamp, fraction = fraction.zfill(3), severity = severity,
description = description,
BGN = bgn, ENDC=ENDC ) )
### workflow output parameters reader
def getVCOWorkflowOutputParameters( server, wfID, execID, user, passw ):
"""
Some workflows might return values, so not only status is important,
but we want to know the result. The output-parameters section can be extracted
in case one knows workflow id and execution id pair.
"""
url = '%s/vco/api/workflows/%s/executions/%s/' % ( server, wfID, execID )
( headers, body, httpStatus ) = getContentsByUrl( url, 'GET', user, passw )
result = ''
if httpStatus == 200 or httpStatus == 202:
try:
data = json.loads( body )
except:
data = None
if not data is None and 'output-parameters' in data.keys():
result = json.dumps( { 'output-parameters' : data['output-parameters'] } )
return ( wfID, execID, httpStatus, result )
def internalWorkflowCall( server, user, passw, wfName, data ):
"""Used internally in wrapper functions"""
result = False
( wfID, execID, httpStatus, execStatus ) = executeVCOWorkflow( server, wfName, user, passw, data )
if ( httpStatus == 200 or httpStatus == 202 and execStatus == 'completed' ):
return ( httpStatus, wfID, execID, execStatus, result )
else:
if httpStatus != 200 and httpStatus != 202:
raise Exception( "There was an error running VCO Workflow %s, the HTTP status code: %s" % ( wfName, httpStatus ) )
if execStatus != "completed":
url = "%s/vco/api/workflows/%s/executions/%s" % ( server, wfID, execID )
raise Exception(
"""The execution status of VCO Workflow %s is not completed, but %s.
Check the status of workflow with id %s and execution id %s
%s/
%s/logs
""" % \
( wfName, execStatus, wfID, execID, url, url )
)
def instantiateVCOWorkflowPresentation( server, wfID, user, passw, params ):
url = '%s/vco/api/workflows/%s/presentation/instances/' % ( server, wfID )
( headers, body, httpStatus ) = getContentsByUrl( url, 'POST', user, passw, params )
data = None
if httpStatus == 200 or httpStatus == 202:
try:
data = json.loads( body )
except:
data = None
return ( wfID, httpStatus, data )
def getVCOProviderFromRTData( rtData ):
return getValueFromRTDataByKey( 'vcoprovider', rtData )
def getLocationFromRTData( rtData ):
return getValueFromRTDataByKey( 'location', rtData )
def makeVMOperationGenericWrapper( user, passw, vmName, wfName, data, printOutputParams ):
( headers, httpStatus, vmData ) = getRTDataForVM( vmName )
vcoprovider = ''
location = ''
if not vmData is None:
vcoprovider = getVCOProviderFromRTData( vmData )
location = getLocationFromRTData( vmData )
if ( vcoprovider == '' ):
errprint("Can't get vcoprovider for VM %s, please check RackTables" % ( vmName ))
sys.exit(1)
if ( location == '' ):
errprint("Can't get location for VM %s, please check RackTables" % ( vmName ))
sys.exit(1)
data = data.replace( "#location#", location )
vcoprovider = sanitizeVCOProvider( vcoprovider )
( httpStatus, wfID, execID, execStatus, result ) = internalWorkflowCall( vcoprovider, user, passw, wfName, data )
( wfIDOP, execIDOP, httpStatusOP, resultOP ) = ( None, None, None, '' )
if ( printOutputParams ):
( wfIDOP, execIDOP, httpStatusOP, resultOP ) = getVCOWorkflowOutputParameters( vcoprovider, wfID, execID, user, passw )
if ( httpStatus in ( 200, 202 ) ):
return ( wfID, execID, httpStatus, execStatus, result, resultOP )
else:
raise Exception( "The VCO Workflow %s with execution id %s has returned non OK HTTP status %s" % ( wfID, execID, httpStatus ) )
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment