Created
November 6, 2023 15:04
-
-
Save jackylamhk/952911c509d9c7c191c1d443bfa30bec to your computer and use it in GitHub Desktop.
Remove inactive licenses assignments for Atlassian
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import httpx | |
import json | |
import asyncio | |
from datetime import datetime, timedelta, timezone | |
from urllib.parse import urlparse | |
logger = logging.getLogger() | |
logging.basicConfig(level=logging.DEBUG) | |
ATLASSIAN_ORG_ID = "" | |
API_KEY = "" | |
JIRA_DOMAIN = "" | |
USERNAME = "" | |
API_TOKEN = "" | |
GROUPS = { | |
"jira-software": "", | |
"jira-servicedesk": "", | |
"jira-product-discovery": "", | |
"confluence": "", | |
} | |
EXEMPTED_USERS = [] | |
FREE_PRODUCTS = ["townsquare", "trello", "opsgenie", "statuspage"] | |
class AtlassianOrgAdminClient: | |
def __init__(self, org_id: str, api_key: str): | |
self.org_id = org_id | |
self._client = httpx.AsyncClient( | |
base_url=f"https://api.atlassian.com/admin/v1/orgs/{org_id}", | |
headers={"Authorization": f"Bearer {api_key}"}, | |
) | |
async def get(self, path: str): | |
try: | |
resp = await self._client.get(path) | |
resp.raise_for_status() | |
return resp.json()["data"] | |
except json.JSONDecodeError: | |
return resp.content | |
async def get_paginated_iter(self, path: str): | |
params = dict() | |
has_next = True | |
while has_next: | |
resp = await self._client.get(path, params=params) | |
resp.raise_for_status() | |
resp_json = resp.json() | |
for data in resp_json["data"]: | |
yield data | |
if resp_json["links"].get("next"): | |
next_url = resp_json["links"]["next"] | |
next_cursor = urlparse(next_url).query.removeprefix("cursor=") | |
params["cursor"] = next_cursor | |
else: | |
has_next = False | |
async def get_users_iter(self): | |
return self.get_paginated_iter("/users") | |
async def get_last_active_dates(self, account_id: str): | |
return await self.get(f"/directory/users/{account_id}/last-active-dates") | |
async def get_user(self, account_id: str): | |
return await self.get(f"/users/{account_id}") | |
class JiraClient: | |
def __init__(self, jira_domain: str, username: str, api_token: str): | |
self.jira_domain = jira_domain | |
self.username = username | |
self._client = httpx.AsyncClient( | |
base_url=f"https://{jira_domain}.atlassian.net/rest/api/3", | |
auth=httpx.BasicAuth(username, api_token), | |
) | |
async def get_paginated_iter(self, path: str): | |
params = {"startAt": 0, "maxResults": 1000} | |
has_next = True | |
while has_next: | |
resp = await self._client.get(path, params=params) | |
resp.raise_for_status() | |
resp_json = resp.json() | |
if resp_json: | |
has_next = True | |
params["startAt"] += params.get("maxResults", 50) | |
else: | |
has_next = False | |
for data in resp_json: | |
yield data | |
async def delete(self, path: str, params: dict): | |
try: | |
resp = await self._client.delete(path, params=params) | |
resp.raise_for_status() | |
return resp.content | |
except httpx.HTTPStatusError: | |
raise ValueError(f"Failed to delete: {resp.content}") | |
async def remove_user_from_group(self, group_id: str, account_id: str): | |
return await self.delete( | |
"/group/user", {"groupId": group_id, "accountId": account_id} | |
) | |
async def get_users_iter(self): | |
return self.get_paginated_iter("/users") | |
admin_client = AtlassianOrgAdminClient(ATLASSIAN_ORG_ID, API_KEY) | |
jira_client = JiraClient(JIRA_DOMAIN, USERNAME, API_TOKEN) | |
NINTY_DAYS_AGO = datetime.now() - timedelta(days=90) | |
async def _review_atlassian_product_access(user): | |
product_access = await admin_client.get_last_active_dates(user.get("accountId")) | |
for product in product_access.get("product_access"): | |
product_key = product["key"] | |
if product_key in FREE_PRODUCTS: | |
logger.debug(f"{user['displayName']}:{product_key}: Free product, skipping") | |
continue | |
last_active = datetime.strptime( | |
product.get("last_active", "1970-01-01"), "%Y-%m-%d" | |
) | |
if last_active >= NINTY_DAYS_AGO: | |
logger.debug( | |
f"{user['displayName']}:{product_key}: Active within 90 days, skipping. Last active: {last_active.strftime('%Y-%m-%d')}" | |
) | |
continue | |
try: | |
logger.info( | |
f"{user['displayName']}:{product_key}: Last seen {last_active}. Removing product access" | |
) | |
await jira_client.remove_user_from_group( | |
GROUPS[product_key], user["accountId"] | |
) | |
except ValueError as e: | |
logger.error(f"Failed to remove user: {e}") | |
continue | |
async def review_atlassian_access(): | |
users = await jira_client.get_users_iter() | |
user: dict | |
async with asyncio.TaskGroup() as tg: | |
async for user in users: | |
if not user.get("active"): | |
logger.debug( | |
f"Skipped {user['displayName']}: Deactivated user, skipping" | |
) | |
continue | |
elif user.get("emailAddress") in EXEMPTED_USERS: | |
logger.debug(f"Skipped {user['displayName']}: Exempted user, skipping") | |
continue | |
elif user.get("accountType") == "app": | |
logger.debug(f"Skipped {user['displayName']}: OAuth client, skipping") | |
continue | |
tg.create_task(_review_atlassian_product_access(user)) | |
logger.info("Atlassian access review completed") | |
if __name__ == "__main__": | |
asyncio.run(review_atlassian_access()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment