Skip to content

Instantly share code, notes, and snippets.

@jackylamhk
Created November 6, 2023 15:04
Show Gist options
  • Save jackylamhk/952911c509d9c7c191c1d443bfa30bec to your computer and use it in GitHub Desktop.
Save jackylamhk/952911c509d9c7c191c1d443bfa30bec to your computer and use it in GitHub Desktop.
Remove inactive licenses assignments for Atlassian
import logging
import httpx
import json
import asyncio
from datetime import datetime, timedelta, timezone
from urllib.parse import urlparse
logger = logging.getLogger()
logging.basicConfig(level=logging.DEBUG)
ATLASSIAN_ORG_ID = ""
API_KEY = ""
JIRA_DOMAIN = ""
USERNAME = ""
API_TOKEN = ""
GROUPS = {
"jira-software": "",
"jira-servicedesk": "",
"jira-product-discovery": "",
"confluence": "",
}
EXEMPTED_USERS = []
FREE_PRODUCTS = ["townsquare", "trello", "opsgenie", "statuspage"]
class AtlassianOrgAdminClient:
def __init__(self, org_id: str, api_key: str):
self.org_id = org_id
self._client = httpx.AsyncClient(
base_url=f"https://api.atlassian.com/admin/v1/orgs/{org_id}",
headers={"Authorization": f"Bearer {api_key}"},
)
async def get(self, path: str):
try:
resp = await self._client.get(path)
resp.raise_for_status()
return resp.json()["data"]
except json.JSONDecodeError:
return resp.content
async def get_paginated_iter(self, path: str):
params = dict()
has_next = True
while has_next:
resp = await self._client.get(path, params=params)
resp.raise_for_status()
resp_json = resp.json()
for data in resp_json["data"]:
yield data
if resp_json["links"].get("next"):
next_url = resp_json["links"]["next"]
next_cursor = urlparse(next_url).query.removeprefix("cursor=")
params["cursor"] = next_cursor
else:
has_next = False
async def get_users_iter(self):
return self.get_paginated_iter("/users")
async def get_last_active_dates(self, account_id: str):
return await self.get(f"/directory/users/{account_id}/last-active-dates")
async def get_user(self, account_id: str):
return await self.get(f"/users/{account_id}")
class JiraClient:
def __init__(self, jira_domain: str, username: str, api_token: str):
self.jira_domain = jira_domain
self.username = username
self._client = httpx.AsyncClient(
base_url=f"https://{jira_domain}.atlassian.net/rest/api/3",
auth=httpx.BasicAuth(username, api_token),
)
async def get_paginated_iter(self, path: str):
params = {"startAt": 0, "maxResults": 1000}
has_next = True
while has_next:
resp = await self._client.get(path, params=params)
resp.raise_for_status()
resp_json = resp.json()
if resp_json:
has_next = True
params["startAt"] += params.get("maxResults", 50)
else:
has_next = False
for data in resp_json:
yield data
async def delete(self, path: str, params: dict):
try:
resp = await self._client.delete(path, params=params)
resp.raise_for_status()
return resp.content
except httpx.HTTPStatusError:
raise ValueError(f"Failed to delete: {resp.content}")
async def remove_user_from_group(self, group_id: str, account_id: str):
return await self.delete(
"/group/user", {"groupId": group_id, "accountId": account_id}
)
async def get_users_iter(self):
return self.get_paginated_iter("/users")
admin_client = AtlassianOrgAdminClient(ATLASSIAN_ORG_ID, API_KEY)
jira_client = JiraClient(JIRA_DOMAIN, USERNAME, API_TOKEN)
NINTY_DAYS_AGO = datetime.now() - timedelta(days=90)
async def _review_atlassian_product_access(user):
product_access = await admin_client.get_last_active_dates(user.get("accountId"))
for product in product_access.get("product_access"):
product_key = product["key"]
if product_key in FREE_PRODUCTS:
logger.debug(f"{user['displayName']}:{product_key}: Free product, skipping")
continue
last_active = datetime.strptime(
product.get("last_active", "1970-01-01"), "%Y-%m-%d"
)
if last_active >= NINTY_DAYS_AGO:
logger.debug(
f"{user['displayName']}:{product_key}: Active within 90 days, skipping. Last active: {last_active.strftime('%Y-%m-%d')}"
)
continue
try:
logger.info(
f"{user['displayName']}:{product_key}: Last seen {last_active}. Removing product access"
)
await jira_client.remove_user_from_group(
GROUPS[product_key], user["accountId"]
)
except ValueError as e:
logger.error(f"Failed to remove user: {e}")
continue
async def review_atlassian_access():
users = await jira_client.get_users_iter()
user: dict
async with asyncio.TaskGroup() as tg:
async for user in users:
if not user.get("active"):
logger.debug(
f"Skipped {user['displayName']}: Deactivated user, skipping"
)
continue
elif user.get("emailAddress") in EXEMPTED_USERS:
logger.debug(f"Skipped {user['displayName']}: Exempted user, skipping")
continue
elif user.get("accountType") == "app":
logger.debug(f"Skipped {user['displayName']}: OAuth client, skipping")
continue
tg.create_task(_review_atlassian_product_access(user))
logger.info("Atlassian access review completed")
if __name__ == "__main__":
asyncio.run(review_atlassian_access())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment