Skip to content

Instantly share code, notes, and snippets.

@gitrgoliveira
Last active July 1, 2024 11:45
Show Gist options
  • Save gitrgoliveira/1b5197fb0637d03fdcadc4f2464a1c95 to your computer and use it in GitHub Desktop.
Save gitrgoliveira/1b5197fb0637d03fdcadc4f2464a1c95 to your computer and use it in GitHub Desktop.
#! /usr/bin/env python3
# Retrieve workspace resource counts and output sorted list (most resources first)
# NB: skeleton code ... only checks for basic errors in responses
# Reads inputs from env vars or interactively
import argparse
from getpass import getpass
import os
import requests
import threading
import time
parser = argparse.ArgumentParser()
# parser.add_argument('-s', action='store', dest='start_time',
# help='Start date & time in the format: 2022-10-04T14:29:09')
# parser.add_argument('-e', action='store', dest='end_time',
# help='End date & time in the format: 2022-10-04T14:29:09')
parser.add_argument('start_time',
help='Start date & time in the format: 2022-10-04T14:29:09')
parser.add_argument('end_time',
help='End date & time in the format: 2022-10-04T14:29:09')
args = parser.parse_args()
PAGESIZE = 100
TS_FORMAT = '%Y-%m-%dT%H:%M:%S%z'
def envorask(var, sensitive=False):
"""Retrieve environment variable called 'var', and if it
doesn't exist, prompt for it via interactive input,
optionally using getpass if secure=True
"""
value = os.getenv(var)
if value is not None:
return value
if sensitive:
return getpass(f'{var}: ')
return input(f'{var}: ')
# Global request counter and lock for thread-safe operations
request_counter = 0
lock = threading.Lock()
def rate_limit():
"""Enforces a rate limit of 15 requests per second."""
global request_counter
with lock:
if request_counter >= 15:
time.sleep(2) # Sleep to reset the counter for the next second
request_counter = 0 # Reset counter after sleeping
else:
request_counter += 1
def reset_counter():
"""Resets the request counter every second."""
global request_counter
while True:
with lock:
request_counter = 0
time.sleep(1)
# Start a background thread to reset the counter every second
threading.Thread(target=reset_counter, daemon=True).start()
def getallpagedata(url, headers):
"""Retrieve data from all pages of a paginated API.
Assumes data is in 'data' and subsequent page is in next/links
"""
# print(f'getting page: {url}')
rate_limit() # Enforce rate limit
r = requests.get(url, headers = headers)
r.raise_for_status()
rj = r.json()
data = rj.get('data')
links = rj.get('links')
if links is None:
return data
nextpage = links.get('next')
# print(f' nextpage -> {nextpage}')
if nextpage is None:
return data
data.extend(getallpagedata(nextpage, headers))
return data
def getrundata(url, headers, start_time):
"""Retrieve data from pages of the run API.
If the oldest run in a page is older than the specified start time, don't request any more pages.
This speeds up the script significantly assuming the time window is recent.
"""
# print(f'getting page: {url}')
rate_limit() # Enforce rate limit
r = requests.get(url, headers = headers)
r.raise_for_status()
rj = r.json()
data = rj.get('data')
links = rj.get('links')
if links is None:
return data
if len(data) == 0:
return data
if data[-1]['attributes']['created-at'] < start_time:
# print(f"Oldest run on page {data[-1]['attributes']['created-at']} is older than start time {start_time}")
return data
nextpage = links.get('next')
# print(f' nextpage -> {nextpage}')
if nextpage is None:
return data
data.extend(getrundata(nextpage, headers, start_time))
return data
# These variables are populated from environment variables if they exist, else prompt for input
TFE_URL = envorask('TFE_URL')
TFE_ORG = envorask('TFE_ORG')
TFE_TOKEN = envorask('TFE_TOKEN', sensitive=True)
headers = { "Content-Type": "application/vnd.api+json",
"Authorization": f"Bearer {TFE_TOKEN}"}
# Get list of all workspaces
wslist = getallpagedata(f"{TFE_URL}/api/v2/organizations/{TFE_ORG}/workspaces?page%5Bsize%5d={PAGESIZE}", headers)
# Extract list of ws ids and names
wsids = [ (x['id'], x['attributes']['name']) for x in wslist ]
print("workspace, run_id, created-at")
for wsid, wsname in wsids:
wsruns = getrundata(f"{TFE_URL}/api/v2/workspaces/{wsid}/runs?page%5Bsize%5d={PAGESIZE}", headers, args.start_time)
for run in wsruns:
if args.start_time < run['attributes']['created-at'] < args.end_time:
print(f"{wsname}, {run['id']}, {run['attributes']['created-at']}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment