Skip to content

Instantly share code, notes, and snippets.

@kaugm
Created May 22, 2023 17:11
Show Gist options
  • Save kaugm/fc11e5df31976af4df6214b6b44ce1b4 to your computer and use it in GitHub Desktop.
Save kaugm/fc11e5df31976af4df6214b6b44ce1b4 to your computer and use it in GitHub Desktop.
Refresh Ocean nodes based on uptime by detaching them.
#!/opt/homebrew/bin/python3
"""
Proof of Concept: Refresh nodes in Ocean Cluster once they reach a certain uptime. Script created per customer request for this functionality.
Purpose: Currently, Ocean does not support the ability to automatically detach "old" nodes once they reach a certain uptime and spin up fresh nodes to replace them.
How it works:
1. Pull the current node count of the Ocean cluster via the API
2. Loop through the list of nodes
3. For each node, if its uptime is greater than a predefined limit, detach it
4. Ocean should automatically spin up a replacement node
Note: Ensure that continuous optimization is enabled.
Further Development: Adjust the API call to allow for functionality with AWS ECS, Azure AKS, and GCP GKE.
"""
try:
import requests
import os
import re
import json
from datetime import datetime, timedelta
except ModuleNotFoundError:
print("Please ensure proper modules are installed:\npip3 install requests")
os._exit(1)
# Get Required Variables for Authentication
TOKEN = os.environ.get('SPOTINST_TOKEN')
ACCOUNT = os.environ.get('SPOTINST_ACCOUNT_AWS')
# Check
if not (TOKEN and ACCOUNT):
print(f"Please set environment variables for token and account id.\n")
os._exit(1)
class Endpoint:
def __init__(self, url, body=None):
"""API Endpoint. Account and Token set by default
Arguments:
url: <class 'str'> representing URL endpoint of API
body: <class 'str'> valid JSON dumped into a string
"""
self.url = url
self.body = body
self.query_params = dict({"accountId":ACCOUNT})
self.headers = dict({
'User-Agent': '_Custom_SDK_Agent',
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + TOKEN
})
def _GET(self):
"""Send a GET request"""
print(f"\nFetching node count for {re.findall(ocean_regex, self.url)[0]}...")
result = requests.get(self.url, params=self.query_params, headers=self.headers)
if result.status_code == requests.codes.ok:
data = json.loads(result.content.decode('utf-8'))
print(f"Success! Fetched node count for {re.findall(ocean_regex, self.url)[0]} -> {result.status_code} {result.reason}")
return data['response']['items']
else:
print(f"Error: {result.reason}\n\n{result.text}\n")
os._exit(1)
def _PUT(self):
"""Send a POST request"""
print(f"\nDetaching Ocean nodes within {re.findall(ocean_regex, self.url)[0]}...")
result = requests.put(self.url, params=self.query_params, data=self.body, headers=self.headers)
if result.status_code == requests.codes.ok:
data = json.loads(result.content.decode('utf-8'))
print(f"Success! Detached nodes for {re.findall(ocean_regex, self.url)[0]} -> {result.status_code} {result.reason}")
return data['response']['items']
else:
print(f"Error: {result.reason}\n\n{result.text}\n")
os._exit(1)
if __name__ == "__main__":
# Variables: EDIT
OCEAN_ID = 'o-21167c2b'
UPTIME_LIMIT = 1209600 # 14 days in seconds
# Variables: DO NOT EDIT
summary = [f"\nComplete! Detached Ocean Nodes. Please wait for Ocean to regain capacity."]
ocean_regex = '(o-[0-9a-z]{8})'
NOW = datetime.now()
# Run API Call: Detach Instances
__base_detach_node_url = f"https://api.spotinst.io/ocean/aws/k8s/cluster/{OCEAN_ID}/detachInstances"
# Run API Call: Get Current Node Count
__base_ocean_url = f'https://api.spotinst.io/ocean/aws/k8s/cluster/{OCEAN_ID}/nodes'
get_ocean_node_count = Endpoint(__base_ocean_url)._GET()
# Loop through nodes and calculate uptime
for node in get_ocean_node_count:
_instance_id = node['instanceId']
_created_at = datetime.strptime(node['createdAt'], '%Y-%m-%dT%H:%M:%S.%fZ')
nodes_to_refresh = []
nodes_to_ignore = []
if (_created_at - NOW).total_seconds() >= UPTIME_LIMIT:
nodes_to_refresh.append(_instance_id)
else:
nodes_to_ignore.append(_instance_id)
# Body for Detach Nodes API Call
body = json.dumps({
"instancesToDetach": nodes_to_refresh,
"shouldTerminateInstances": True, # Keep this set to true
"shouldDecrementTargetCapacity": False # Keep this set to false
})
# Run API Call: Detach Nodes
detach_nodes = Endpoint(__base_detach_node_url, body=body)
detach_nodes._PUT()
for node in nodes_to_ignore:
summary.append(f"Nodes that were not detached: {', '.join(nodes_to_ignore)}")
# Output: Print to terminal
print('\n'.join(summary))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment