Skip to content

Instantly share code, notes, and snippets.

@taco-shellcode
Created February 12, 2021 15:35
Show Gist options
  • Save taco-shellcode/baa69f0b097694a6eb040ef9f5b17f90 to your computer and use it in GitHub Desktop.
Save taco-shellcode/baa69f0b097694a6eb040ef9f5b17f90 to your computer and use it in GitHub Desktop.
import base64
import datetime
import requests
import urllib
class Session(object):
__instance = None
class _Session(object):
def __init__(self, jamf_domain: str, username: str, password: str, api_token: str, token_expiration: int):
self.jamf_domain = jamf_domain
self.username = username
self.password = password
self.api_token = api_token
self.token_expiration = int(token_expiration)
self.token_is_expired = False
def _call_api_endpoint(self, api_endpoint: str, method="POST") -> requests.Response:
api_url = urllib.parse.urljoin(self.jamf_domain, api_endpoint)
api_response = requests.post(
url = api_url,
headers={
"accept": "application/json",
"authorization": f"Bearer {self.api_token}"
}
)
return api_response
def _get_refreshed_session(self):
api_response = self._call_api_endpoint("/api/v1/auth/keep-alive")
api_response_json = api_response.json()
if api_response.status_code == 200:
self.api_token = api_response_json["token"]
self.token_expiration = round(datetime.datetime.strptime(api_response_json["expires"], '%Y-%m-%dT%H:%M:%S.%fZ').timestamp() * 1000)
return self
@property
def is_expiring(self) -> bool:
is_token_expiring = False
current_datetime = datetime.datetime.now()
expiration_window = int((current_datetime - datetime.timedelta(minutes=3)).timestamp() * 1000)
if self.token_expiration - expiration_window <= 0:
is_token_expiring = True
return is_token_expiring
@property
def basic_auth_token(self) -> str:
return str(base64.b64encode(bytes(f"{self.username}:{self.password}", 'utf-8')), 'utf-8')
def __new__(cls, jamf_domain="", username="", password="", api_token="", token_expiration=0):
if Session.__instance is None:
if not username or not password:
raise ValueError("Session not initialized, username and password field must be provided.")
Session.__instance = Session._Session(jamf_domain, username, password, api_token, token_expiration)
elif Session.__instance.is_expiring:
Session.__instance = Session.__instance._get_refreshed_session()
return Session.__instance
@staticmethod
def authenticate(jamf_domain: str, username: str, password: str):
if Session.__instance is not None:
return Session.__instance
jamf_authentication_url = urllib.parse.urljoin(jamf_domain, "/api/v1/auth/token")
basic_credentials = str(base64.b64encode(bytes(f"{username}:{password}", 'utf-8')), 'utf-8')
response = requests.post(
url = jamf_authentication_url,
headers={
"accept": "application/json",
"authorization": f"Basic {basic_credentials}"
}
)
if response.status_code != 200:
raise ConnectionError(f"Request to the endpoint failed with the error code {response.status_code}.\nBody: {str(response.content, 'utf-8')}")
auth_response_json = response.json()
expiration_timestamp = round(datetime.datetime.strptime(auth_response_json["expires"], '%Y-%m-%dT%H:%M:%S.%fZ').timestamp() * 1000)
return Session(jamf_domain, username, password, auth_response_json["token"], expiration_timestamp)
@staticmethod
def invalidate() -> bool:
response_successful = False
if Session.__instance is not None:
api_response = Session.__instance._call_api_endpoint("/api/v1/auth/invalidate-token")
if api_response.status_code == 204:
Session.__instance.token_is_expired = True
Session.__instance = None
response_successful = True
return response_successful
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment