Created
April 18, 2023 15:57
-
-
Save greyhoundforty/39e6aa9a2f16e2283ac190dc8aca023a to your computer and use it in GitHub Desktop.
ce-schematics-pyton-v2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import logging | |
import time | |
from ibm_cloud_sdk_core.authenticators import IAMAuthenticator | |
from ibm_cloud_sdk_core import ApiException | |
from ibm_schematics.schematics_v1 import SchematicsV1 | |
from datetime import datetime | |
from logdna import LogDNAHandler | |
from dotenv import load_dotenv | |
load_dotenv() | |
authenticator = IAMAuthenticator( | |
apikey=os.environ.get('IBMCLOUD_API_KEY'), | |
client_id='bx', | |
client_secret='bx' | |
) | |
refreshToken = authenticator.token_manager.request_token()['refresh_token'] | |
workspaceId = os.environ.get('WORKSPACE_ID') | |
def schematicsClient(): | |
client = SchematicsV1(authenticator=authenticator) | |
schematicsURL = 'https://us-east.schematics.cloud.ibm.com' | |
client.set_service_url(schematicsURL) | |
return client | |
def getWorkspaceStatus(): | |
client = schematicsClient() | |
wsStatus = client.get_workspace(w_id=workspaceId).get_result()['status'] | |
return wsStatus | |
def logDnaLogger(): | |
key = os.environ.get('LOGDNA_INGESTION_KEY') | |
log = logging.getLogger('logdna') | |
log.setLevel(logging.INFO) | |
options = { | |
'index_meta': True, | |
'tags': 'testing-updated-logic', | |
'url': 'https://logs.us-south.logging.cloud.ibm.com/logs/ingest', | |
'log_error_response': True, | |
'app': 'testing-updated-logic', | |
} | |
logger = LogDNAHandler(key, options) | |
log.addHandler(logger) | |
return log | |
def deleteWorkspaceResources(): | |
log = logDnaLogger() | |
client = schematicsClient() | |
wsDestroy = client.destroy_workspace_command( | |
w_id=workspaceId, | |
refresh_token=refreshToken | |
).get_result() | |
destroyActivityId = wsDestroy.get('activityid') | |
while True: | |
workspaceStatus = getWorkspaceStatus() | |
log.info("Current destroy run status:", workspaceStatus) | |
if workspaceStatus is None: | |
log.info("Error getting workspace status. Exiting.") | |
break | |
if workspaceStatus == "INACTIVE": | |
log.info("Workspace status is INACTIVE. Destroy successful.") | |
break | |
elif workspaceStatus == "FAILED" or workspaceStatus == "CANCELLED": | |
log.error("Workspace status is " + workspaceStatus) | |
log,error("Workspace destroy failed. Please check the logs by running the following command: ibmcloud schematics job logs --id " + destroyActivityId) | |
break | |
else: | |
log.info(f"Workspace status is {workspaceStatus}. Waiting for it to become INACTIVE.") | |
# Wait for 60 seconds before checking again | |
time.sleep(60) | |
def applyWorkspace(): | |
client = schematicsClient() | |
log = logDnaLogger() | |
wsApply = client.apply_workspace_command( | |
w_id=workspaceId, | |
refresh_token=refreshToken, | |
).get_result() | |
applyActivityId = wsApply.get('activityid') | |
while True: | |
workspaceStatus = getWorkspaceStatus() | |
log.info("Current apply run status:", workspaceStatus) | |
if workspaceStatus is None: | |
log.info("Error getting workspace status. Exiting.") | |
break | |
if workspaceStatus == "ACTIVE": | |
log.info("Workspace status is ACTIVE, workspace apply successful.") | |
break | |
elif workspaceStatus == "INPROGRESS": | |
log.info(f"Workspace status is {workspaceStatus}. Waiting for it to become ACTIVE.") | |
# Wait for 60 seconds before checking again. This will need to be adjusted when moving to bare metal deployments. | |
time.sleep(60) | |
elif workspaceStatus == "FAILED" or workspaceStatus == "CANCELLED": | |
log.error("Workspace status is " + workspaceStatus) | |
log,error("Workspace apply failed. Please check the logs by running the following command: ibmcloud schematics job logs --id " + applyActivityId) | |
break | |
else: | |
log.error(f"Workspace status is {workspaceStatus}. This status caused an exit and needs to be baked in to the logic. Exiting.") | |
def main(): | |
log = logDnaLogger() | |
workspaceStatus = getWorkspaceStatus() | |
if workspaceStatus is None: | |
log.error("Error getting workspace status. Exiting.") | |
return | |
if workspaceStatus == "INACTIVE": | |
log.info("Workspace is INACTIVE. Running apply function.") | |
applyWorkspace() | |
elif workspaceStatus == "ACTIVE": | |
log.info("Workspace is ACTIVE. Running destroy and apply functions.") | |
deleteWorkspaceResources() | |
applyWorkspace() | |
elif workspaceStatus == "INPROGRESS": | |
log.info(f"Workspace status is {workspaceStatus}. Waiting for it to become ACTIVE.") | |
time.sleep(60) | |
else: | |
log.error(f"Workspace status is {workspaceStatus}. This status caused an exit and needs to be baked in to the logic. Exiting.") | |
if __name__ == "__main__": | |
try: | |
main() | |
except Exception as e: | |
log.error(f"Error occurred: {e}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment