Created
November 6, 2023 15:10
-
-
Save jackylamhk/22c229605e9219e2c583f762ece2fba5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import httpx | |
import jwt | |
from json import JSONDecodeError | |
from datetime import datetime, timezone, timedelta | |
from urllib.parse import urlunsplit, urlencode | |
class InsidedAdminError(Exception): | |
pass | |
class InSidedAdmin: | |
API_BASE_URLS = { | |
"eu": "https://api2-eu-west-1.insided.com", | |
"us": "https://api2-us-west-2.insided.com", | |
"staging": "https://api2-eu-west-1.almostinsided.com", | |
} | |
SSO_DOMAINS = { | |
"eu": "sso.api.insided.com", | |
"us": "sso.api.insided.com", | |
"staging": "sso.api.almostinsided.com", | |
} | |
def __init__( | |
self, env_tag: str, client_id: str, client_secret: str, domain: str = None | |
): | |
if env_tag not in self.API_BASE_URLS.keys(): | |
raise RuntimeError( | |
f"Unrecognized environment tag: '{str(env_tag)}'. \ | |
Allowed tags: {list(self.API_BASE_URLS.keys())}" | |
) | |
self._domain = domain | |
self._env_tag = env_tag | |
self._api_url = self.API_BASE_URLS.get(env_tag) | |
self._sso_domain = self.SSO_DOMAINS.get(env_tag) | |
self._client_id = client_id | |
self._client_secret = client_secret | |
self.token_exp = 0 | |
self.start_session() | |
@property | |
def domain(self) -> str: | |
return self._domain | |
@property | |
def api_url(self) -> str: | |
return self._api_url | |
@property | |
def sso_domain(self) -> str: | |
return self._sso_domain | |
def start_session(self): | |
self.session = httpx.AsyncClient() | |
async def fetch_token(self): | |
self.session.headers.pop("Authorization", None) | |
payload = { | |
"client_id": self._client_id, | |
"client_secret": self._client_secret, | |
"grant_type": "client_credentials", | |
"scope": "read write", | |
} | |
response = await self.post(path="/oauth2/token", json=payload) | |
self.token = response["access_token"] | |
self.token_exp = datetime.now().timestamp() + response["expires_in"] | |
self.session.headers["Authorization"] = f"Bearer {self.token}" | |
async def refresh_token(self): | |
if self.token_exp - 60 < datetime.now().timestamp(): | |
await self.fetch_token() | |
async def get(self, path: str, params: str = None) -> dict | None: | |
await self.refresh_token() | |
request = await self.session.get(f"{self.api_url}{path}", params=params) | |
try: | |
request.raise_for_status() | |
except httpx.HTTPError as e: | |
raise InsidedAdminError( | |
f"InSidedAdmin Error: {e}, content: {request.content}" | |
) | |
return request.json() | |
async def post(self, path: str, json: dict) -> dict | None: | |
if path != "/oauth2/token": | |
await self.refresh_token() | |
request = await self.session.post(f"{self.api_url}{path}", json=json) | |
try: | |
request.raise_for_status() | |
return request.json() | |
except httpx.HTTPError as e: | |
raise InsidedAdminError( | |
f"InSidedAdmin Error: {e}, content: {request.content}" | |
) | |
except JSONDecodeError as e: | |
return None | |
async def put(self, path: str, json: dict = None) -> dict: | |
await self.refresh_token() | |
request = await self.session.put(f"{self.api_url}{path}", json=json) | |
try: | |
request.raise_for_status() | |
except httpx.HTTPError as e: | |
raise InsidedAdminError( | |
f"InSidedAdmin Error: {e}, content: {request.content}" | |
) | |
return request.json() | |
# Set the class user by creating one. If user is already created, the API will return its info | |
async def create_user(self, payload: dict) -> dict: | |
""" | |
Sample payload: { | |
"email": "user@insided.com", | |
"username": "user", | |
"password": "asdTW$sdjksd", | |
"user_role": [ | |
"roles.registered" | |
], | |
"profile_field": { | |
"12": "Smith" | |
}, | |
"sso": { | |
"facebook": "333ABBDDD", | |
"oauth2": "4545ag63274" | |
} | |
} | |
""" | |
user_info = await self.post(path="/user/register", json={"data": payload}) | |
try: | |
return user_info["user"] | |
except KeyError as e: | |
raise InsidedAdminError( | |
"InSidedAdmin Error: Unable to load User ID/Login Token from response: " | |
+ str(e) | |
) | |
async def get_user_by_field(self, field: str, value: str) -> dict: | |
return await self.get(path=f"/user/{field}/{value}") | |
async def update_user_custom_field(self, user_id: str, field: str, value: str): | |
return await self.put(path=f"/user/{user_id}/profile_field/{field}/{value}") | |
async def add_role_to_user(self, user_id: str, roles: list): | |
return await self.post( | |
path=f"/user/{user_id}/role", json={"data": {"user_role": roles}} | |
) | |
def user_signin_uri( | |
self, email: str, username: str, secret: str, validity: int | |
) -> str: | |
""" | |
Generates a Signin URI for Insided. Please note for staging, | |
the domain requires the suffix "-staging" or "-sandbox". | |
validity: The signed JWT's validity (in minutes) | |
""" | |
# Sign new JWT for Insided | |
payload = { | |
"exp": datetime.now(tz=timezone.utc) + timedelta(minutes=validity), | |
"id": email, | |
"email": email, | |
"username": username, | |
} | |
token = jwt.encode(payload, secret, "ES256") | |
params = {"token": token, "customer": self.domain} | |
signin_uri = urlunsplit( | |
("https", self.sso_domain, "/auth/token/return", urlencode(params), "") | |
) | |
return signin_uri |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment