Skip to content

Instantly share code, notes, and snippets.

@greyhoundforty
Created July 5, 2023 21:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save greyhoundforty/a3a5d4fc986989e975c251c2534f26fd to your computer and use it in GitHub Desktop.
Save greyhoundforty/a3a5d4fc986989e975c251c2534f26fd to your computer and use it in GitHub Desktop.
Rolling IaaS Schematics via Code Engine
import os
import sys
import time
from logdna import LogDNAHandler
import logging
from ibm_cloud_sdk_core import ApiException
from ibm_schematics.schematics_v1 import SchematicsV1
from ibm_cloud_sdk_core.authenticators import IAMAuthenticator
import SoftLayer
from SoftLayer import HardwareManager
from ibm_code_engine_sdk.code_engine_v2 import CodeEngineV2
ibmApiKey = os.environ.get('IBMCLOUD_API_KEY')
if not ibmApiKey:
raise ValueError("IBMCLOUD_API_KEY environment variable not found")
workspaceId = os.environ.get('WORKSPACE_ID')
if not workspaceId:
raise ValueError("WORKSPACE_ID environment variable not found")
projectId = os.environ.get('CE_PROJECT_ID')
if not projectId:
raise ValueError("CE_PROJECT_ID environment variable not found")
authenticator = IAMAuthenticator(
apikey=ibmApiKey,
client_id='bx',
client_secret='bx' # pragma: allowlist secret
)
refreshToken = authenticator.token_manager.request_token()['refresh_token']
def logDnaLogger():
key = os.environ.get('LOGDNA_INGESTION_KEY')
log = logging.getLogger('logdna')
log.setLevel(logging.INFO)
options = {
'index_meta': True,
'tags': 'rolling-iaas',
'url': 'https://logs.private.us-south.logging.cloud.ibm.com/logs/ingest',
'log_error_response': True,
'app': 'refresh-job'
}
logger = LogDNAHandler(key, options)
log.addHandler(logger)
return log
def slClient():
client = SoftLayer.create_client_from_env(
username='apikey',
api_key=ibmApiKey
)
return client
def schematicsClient():
client = SchematicsV1(authenticator=authenticator)
schematicsURL = 'https://private-us-south.schematics.cloud.ibm.com'
client.set_service_url(schematicsURL)
return client
def codeEngineClient():
client = CodeEngineV2(authenticator=authenticator)
codeEngineURL = 'https://api.us-south.codeengine.cloud.ibm.com/v2'
client.set_service_url(codeEngineURL)
return client
def getDepolyedServerId():
log = logDnaLogger()
schematics = schematicsClient()
wsOutputs = schematics.get_workspace_outputs(w_id=workspaceId).get_result()
deployedServerId = wsOutputs[0]['output_values'][0]['instance_id']['value']
if isinstance(deployedServerId, str):
return deployedServerId
else:
log.info("No currently deployed server found. Skipping tagging.")
def callCodeEnginejob(deployedServer):
log = logDnaLogger()
client = codeEngineClient()
response = client.create_job_run(
project_id=projectId,
job_name='reclaim-server-job',
run_arguments=[deployedServer]
)
job_run = response.get_result()
jobId = job_run['id']
log.info(f"Code Engine job started. Job run ID: {jobId}")
def attachTag(instanceId):
client = slClient()
hardwareManager = HardwareManager(client)
hardwareManager.edit(
hardware_id=instanceId,
tags='reclaim_immediately'
)
def tagDeployedServerId():
log = logDnaLogger()
schematics = schematicsClient()
wsOutputs = schematics.get_workspace_outputs(w_id=workspaceId).get_result()
deployedServerId = wsOutputs[0]['output_values'][0]['instance_id']['value']
if isinstance(deployedServerId, str):
attachTag(instanceId=deployedServerId)
else:
log.info("No currently deployed server found. Skipping tagging.")
def getWorkspaceStatus():
log = logDnaLogger()
schematics = schematicsClient()
try:
workspace = schematics.get_workspace(w_id=workspaceId).get_result()
status = workspace['status']
return str(status)
except ApiException as e:
log.error(f"Error getting workspace status: {e}")
sys.exit(1)
def destroyWorkspaceResources():
log = logDnaLogger()
schematics = schematicsClient()
try:
wsDestroy = schematics.destroy_workspace_command(
w_id=workspaceId,
refresh_token=refreshToken
).get_result()
destroyActivityId = wsDestroy.get('activityid')
log.info("Destroying workspace resources")
while True:
time.sleep(5)
status = getWorkspaceStatus()
if status == "INACTIVE":
log.info("Resources destroyed successfully.")
time.sleep(60)
break
elif status in ["FAILED", "CANCELLED"]:
log.error(f"Destroy operation {status}")
log.error(f"Destroy activity ID: {destroyActivityId}")
break
else:
log.info("Waiting for workspace resources to be destroyed")
log.info("Next status check in 1 minute ..")
log.info(f"Current workspace status: {status}")
time.sleep(60)
except ApiException as e:
log.error(f"Error destroying resources: {e}")
sys.exit(1)
def applyWorkspaceResources():
log = logDnaLogger()
schematics = schematicsClient()
try:
wsApply = schematics.apply_workspace_command(
w_id=workspaceId,
refresh_token=refreshToken
).get_result()
applyActivityId = wsApply.get('activityid')
log.info("Provisioning workspace resources")
while True:
time.sleep(5)
status = getWorkspaceStatus()
if status == "ACTIVE":
log.info("Resources provisioned successfully.")
break
elif status in ["FAILED", "CANCELLED"]:
log.error(f"Apply operation {status}")
log.error(f"Apply activity ID: {applyActivityId}")
break
else:
log.info("Waiting for resources to be provisioned. Next status check in 10 minutes...")
log.info(f"Current workspace status: {status}")
time.sleep(600)
except ApiException as e:
log.error(f"Error applying resources: {e}")
sys.exit(1)
def main():
log = logDnaLogger()
status = getWorkspaceStatus()
deployedServer = getDepolyedServerId()
log.info(f"Starting hardware refresh. Current workspace status: {status}")
if status == "INACTIVE":
applyWorkspaceResources()
callCodeEnginejob(deployedServer)
elif status == "ACTIVE":
tagDeployedServerId()
destroyWorkspaceResources()
applyWorkspaceResources()
callCodeEnginejob(deployedServer)
elif status == "FAILED":
attempts = 0
while attempts < 3 and status == "FAILED":
log.info("Workspace is marked as Failed.")
log.info("Automated recorvery attempt: " + str(attempts + 1) + "/3")
tagDeployedServerId()
destroyWorkspaceResources()
applyWorkspaceResources()
callCodeEnginejob(deployedServer)
status = getWorkspaceStatus()
attempts += 1
if status == "FAILED":
log.error(f"Workspace is marked as: {status} for the 3rd time. Exiting.")
exit(1)
else:
log.info("Workspace is currently not in a valid state to run destroy/apply actions")
log.info(f"Current workspace status: {status}")
log.info("Polling again in 60 seconds.")
time.sleep(60)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment