Skip to content

Instantly share code, notes, and snippets.

@jackylamhk
Created November 6, 2023 15:10
Show Gist options
  • Save jackylamhk/22c229605e9219e2c583f762ece2fba5 to your computer and use it in GitHub Desktop.
Save jackylamhk/22c229605e9219e2c583f762ece2fba5 to your computer and use it in GitHub Desktop.
import httpx
import jwt
from json import JSONDecodeError
from datetime import datetime, timezone, timedelta
from urllib.parse import urlunsplit, urlencode
class InsidedAdminError(Exception):
pass
class InSidedAdmin:
API_BASE_URLS = {
"eu": "https://api2-eu-west-1.insided.com",
"us": "https://api2-us-west-2.insided.com",
"staging": "https://api2-eu-west-1.almostinsided.com",
}
SSO_DOMAINS = {
"eu": "sso.api.insided.com",
"us": "sso.api.insided.com",
"staging": "sso.api.almostinsided.com",
}
def __init__(
self, env_tag: str, client_id: str, client_secret: str, domain: str = None
):
if env_tag not in self.API_BASE_URLS.keys():
raise RuntimeError(
f"Unrecognized environment tag: '{str(env_tag)}'. \
Allowed tags: {list(self.API_BASE_URLS.keys())}"
)
self._domain = domain
self._env_tag = env_tag
self._api_url = self.API_BASE_URLS.get(env_tag)
self._sso_domain = self.SSO_DOMAINS.get(env_tag)
self._client_id = client_id
self._client_secret = client_secret
self.token_exp = 0
self.start_session()
@property
def domain(self) -> str:
return self._domain
@property
def api_url(self) -> str:
return self._api_url
@property
def sso_domain(self) -> str:
return self._sso_domain
def start_session(self):
self.session = httpx.AsyncClient()
async def fetch_token(self):
self.session.headers.pop("Authorization", None)
payload = {
"client_id": self._client_id,
"client_secret": self._client_secret,
"grant_type": "client_credentials",
"scope": "read write",
}
response = await self.post(path="/oauth2/token", json=payload)
self.token = response["access_token"]
self.token_exp = datetime.now().timestamp() + response["expires_in"]
self.session.headers["Authorization"] = f"Bearer {self.token}"
async def refresh_token(self):
if self.token_exp - 60 < datetime.now().timestamp():
await self.fetch_token()
async def get(self, path: str, params: str = None) -> dict | None:
await self.refresh_token()
request = await self.session.get(f"{self.api_url}{path}", params=params)
try:
request.raise_for_status()
except httpx.HTTPError as e:
raise InsidedAdminError(
f"InSidedAdmin Error: {e}, content: {request.content}"
)
return request.json()
async def post(self, path: str, json: dict) -> dict | None:
if path != "/oauth2/token":
await self.refresh_token()
request = await self.session.post(f"{self.api_url}{path}", json=json)
try:
request.raise_for_status()
return request.json()
except httpx.HTTPError as e:
raise InsidedAdminError(
f"InSidedAdmin Error: {e}, content: {request.content}"
)
except JSONDecodeError as e:
return None
async def put(self, path: str, json: dict = None) -> dict:
await self.refresh_token()
request = await self.session.put(f"{self.api_url}{path}", json=json)
try:
request.raise_for_status()
except httpx.HTTPError as e:
raise InsidedAdminError(
f"InSidedAdmin Error: {e}, content: {request.content}"
)
return request.json()
# Set the class user by creating one. If user is already created, the API will return its info
async def create_user(self, payload: dict) -> dict:
"""
Sample payload: {
"email": "user@insided.com",
"username": "user",
"password": "asdTW$sdjksd",
"user_role": [
"roles.registered"
],
"profile_field": {
"12": "Smith"
},
"sso": {
"facebook": "333ABBDDD",
"oauth2": "4545ag63274"
}
}
"""
user_info = await self.post(path="/user/register", json={"data": payload})
try:
return user_info["user"]
except KeyError as e:
raise InsidedAdminError(
"InSidedAdmin Error: Unable to load User ID/Login Token from response: "
+ str(e)
)
async def get_user_by_field(self, field: str, value: str) -> dict:
return await self.get(path=f"/user/{field}/{value}")
async def update_user_custom_field(self, user_id: str, field: str, value: str):
return await self.put(path=f"/user/{user_id}/profile_field/{field}/{value}")
async def add_role_to_user(self, user_id: str, roles: list):
return await self.post(
path=f"/user/{user_id}/role", json={"data": {"user_role": roles}}
)
def user_signin_uri(
self, email: str, username: str, secret: str, validity: int
) -> str:
"""
Generates a Signin URI for Insided. Please note for staging,
the domain requires the suffix "-staging" or "-sandbox".
validity: The signed JWT's validity (in minutes)
"""
# Sign new JWT for Insided
payload = {
"exp": datetime.now(tz=timezone.utc) + timedelta(minutes=validity),
"id": email,
"email": email,
"username": username,
}
token = jwt.encode(payload, secret, "ES256")
params = {"token": token, "customer": self.domain}
signin_uri = urlunsplit(
("https", self.sso_domain, "/auth/token/return", urlencode(params), "")
)
return signin_uri
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment