Skip to content

Instantly share code, notes, and snippets.

@greyhoundforty
Created April 18, 2023 15:57
Show Gist options
  • Save greyhoundforty/39e6aa9a2f16e2283ac190dc8aca023a to your computer and use it in GitHub Desktop.
Save greyhoundforty/39e6aa9a2f16e2283ac190dc8aca023a to your computer and use it in GitHub Desktop.
ce-schematics-pyton-v2
import os
import logging
import time
from ibm_cloud_sdk_core.authenticators import IAMAuthenticator
from ibm_cloud_sdk_core import ApiException
from ibm_schematics.schematics_v1 import SchematicsV1
from datetime import datetime
from logdna import LogDNAHandler
from dotenv import load_dotenv
load_dotenv()
authenticator = IAMAuthenticator(
apikey=os.environ.get('IBMCLOUD_API_KEY'),
client_id='bx',
client_secret='bx'
)
refreshToken = authenticator.token_manager.request_token()['refresh_token']
workspaceId = os.environ.get('WORKSPACE_ID')
def schematicsClient():
client = SchematicsV1(authenticator=authenticator)
schematicsURL = 'https://us-east.schematics.cloud.ibm.com'
client.set_service_url(schematicsURL)
return client
def getWorkspaceStatus():
client = schematicsClient()
wsStatus = client.get_workspace(w_id=workspaceId).get_result()['status']
return wsStatus
def logDnaLogger():
key = os.environ.get('LOGDNA_INGESTION_KEY')
log = logging.getLogger('logdna')
log.setLevel(logging.INFO)
options = {
'index_meta': True,
'tags': 'testing-updated-logic',
'url': 'https://logs.us-south.logging.cloud.ibm.com/logs/ingest',
'log_error_response': True,
'app': 'testing-updated-logic',
}
logger = LogDNAHandler(key, options)
log.addHandler(logger)
return log
def deleteWorkspaceResources():
log = logDnaLogger()
client = schematicsClient()
wsDestroy = client.destroy_workspace_command(
w_id=workspaceId,
refresh_token=refreshToken
).get_result()
destroyActivityId = wsDestroy.get('activityid')
while True:
workspaceStatus = getWorkspaceStatus()
log.info("Current destroy run status:", workspaceStatus)
if workspaceStatus is None:
log.info("Error getting workspace status. Exiting.")
break
if workspaceStatus == "INACTIVE":
log.info("Workspace status is INACTIVE. Destroy successful.")
break
elif workspaceStatus == "FAILED" or workspaceStatus == "CANCELLED":
log.error("Workspace status is " + workspaceStatus)
log,error("Workspace destroy failed. Please check the logs by running the following command: ibmcloud schematics job logs --id " + destroyActivityId)
break
else:
log.info(f"Workspace status is {workspaceStatus}. Waiting for it to become INACTIVE.")
# Wait for 60 seconds before checking again
time.sleep(60)
def applyWorkspace():
client = schematicsClient()
log = logDnaLogger()
wsApply = client.apply_workspace_command(
w_id=workspaceId,
refresh_token=refreshToken,
).get_result()
applyActivityId = wsApply.get('activityid')
while True:
workspaceStatus = getWorkspaceStatus()
log.info("Current apply run status:", workspaceStatus)
if workspaceStatus is None:
log.info("Error getting workspace status. Exiting.")
break
if workspaceStatus == "ACTIVE":
log.info("Workspace status is ACTIVE, workspace apply successful.")
break
elif workspaceStatus == "INPROGRESS":
log.info(f"Workspace status is {workspaceStatus}. Waiting for it to become ACTIVE.")
# Wait for 60 seconds before checking again. This will need to be adjusted when moving to bare metal deployments.
time.sleep(60)
elif workspaceStatus == "FAILED" or workspaceStatus == "CANCELLED":
log.error("Workspace status is " + workspaceStatus)
log,error("Workspace apply failed. Please check the logs by running the following command: ibmcloud schematics job logs --id " + applyActivityId)
break
else:
log.error(f"Workspace status is {workspaceStatus}. This status caused an exit and needs to be baked in to the logic. Exiting.")
def main():
log = logDnaLogger()
workspaceStatus = getWorkspaceStatus()
if workspaceStatus is None:
log.error("Error getting workspace status. Exiting.")
return
if workspaceStatus == "INACTIVE":
log.info("Workspace is INACTIVE. Running apply function.")
applyWorkspace()
elif workspaceStatus == "ACTIVE":
log.info("Workspace is ACTIVE. Running destroy and apply functions.")
deleteWorkspaceResources()
applyWorkspace()
elif workspaceStatus == "INPROGRESS":
log.info(f"Workspace status is {workspaceStatus}. Waiting for it to become ACTIVE.")
time.sleep(60)
else:
log.error(f"Workspace status is {workspaceStatus}. This status caused an exit and needs to be baked in to the logic. Exiting.")
if __name__ == "__main__":
try:
main()
except Exception as e:
log.error(f"Error occurred: {e}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment