Skip to content

Instantly share code, notes, and snippets.

@kuharan
Last active February 25, 2021 17:04
Show Gist options
  • Save kuharan/90bd52d5faa970c82717f13eaa4d6bec to your computer and use it in GitHub Desktop.
Save kuharan/90bd52d5faa970c82717f13eaa4d6bec to your computer and use it in GitHub Desktop.
AWS Lambda Function to automate DP APIs
import json
import requests
import time
from pytz import timezone
import datetime
status_dict = {'0':'Running','1':'Running with Errors','12':'Running with Failures','15':'Queueing with Failures'}
session_type_dict = {'0':'Backup'}
def login():
session = requests.session()
url = "https://xxx.xxx.xxx.com:xxx/auth/realms/DataProtector/protocol/openid-connect/token"
payload = {"username":"user_name|*|xxx.xxx.xxx.com", "password":"password", "client_id":"dp-gui", "grant_type":"password"}
response = json.loads(session.post(url, data = payload, verify = r"path_to_cert\xxx.xxx.xxx.com_cacert.pem").text)
access_token = response['access_token']
return access_token,session
def check_session(access_token, session):
url = "https://xxx.xxx.xxx.com:xxx/idb/sessions/filter/"
payload = "{\r\n \"filter\": {\r\n \"status\": [0,1,12,15],\r\n \"sessionType\": [0]\r\n }\r\n}"
headers = {
'Authorization': 'Bearer {}'.format(access_token),
'Content-Type': 'application/json'
}
response = json.loads(session.post(url, headers=headers, data = payload, verify = r"path_to_cert\xxx.xxx.xxx.com_cacert.pem").text)
return response
def check_duration(session_start_time, hours):
epoch_time = int(time.time())
if (epoch_time - session_start_time) > hours * 3600:
return True
else:
return False
def convert_epoch_to_datetime(epoch_time):
gmt_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(epoch_time))
cst_time = datetime.datetime.strptime(gmt_time, '%Y-%m-%d %H:%M:%S').astimezone(timezone('US/Central'))
return str(cst_time)+" CT"
def lambda_handler(event, context):
access_token,session = login()
response = check_session(access_token, session)
long_running_sessions = []
for item in range(response['items_count']):
output={}
long_run_status = check_duration(session_start_time=response['items'][item]['startTime'], hours=2)
if long_run_status:
output['Name'] = response['items'][item]['name']
output['Datalist'] = response['items'][item]['datalist']
output['SessionType'] = session_type_dict[str(response['items'][item]['sessionType'])]
output['Status'] = status_dict[str(response['items'][item]['status'])]
output['StartTime'] = convert_epoch_to_datetime(response['items'][item]['startTime'])
output['Owner'] = response['items'][item]['owner']
long_running_sessions.append(output)
url = "https://api-gateway.us-east-1.amazonaws.com/default/lambda_name"
headers = {
'Content-Type': 'application/json'
}
response = requests.get(url, headers=headers, data = json.dumps(long_running_sessions))
print(response.text.encode('utf8'))
lambda_handler(None, None)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment