Created
November 27, 2019 04:25
-
-
Save taniacomputer/329c2d9ff1666b63d5ddb7d55296b9b4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# slackapp_idSearch.py | |
# This function is invoked by an SNS topic | |
# and runs a Jamf Pro equivalent of a computer id search, | |
# formatting the results and sending them back to Slack | |
# Author: @taniacomputer | |
# Last modified: 20/11/2019 | |
import json | |
import boto3 | |
from botocore.vendored import requests | |
import time | |
from datetime import datetime | |
import dateutil.parser | |
from dateutil import tz | |
import re | |
# Secrets manager endpoint url and region name | |
SM_ENDPOINT_URL = "https://secretsmanager.xx-yyyy-z.amazonaws.com" | |
SM_REGION_NAME = "xx-yyyy-z" | |
SLACK_CHANNEL_ID = "ABCDEFG" | |
API_URL = "https://jamf.com/JSSResource/computers/id/" | |
HEADERS = { | |
'Accept': 'application/json', | |
'Content-type': 'application/json', | |
} | |
glad_emoji = ":white_check_mark:" | |
sad_emoji = ":warning:" | |
info_emoji = ":information_source:" | |
def lambda_handler(event, context): | |
data_from_slack = event["Records"][0]["Sns"]["Message"] | |
json_loaded_data = json.loads(data_from_slack) | |
search_id = json_loaded_data["actions"][0]["value"] | |
response_url = json_loaded_data["response_url"] | |
result = search_handler(search_id,response_url) | |
def search_handler(search_id, response_url): | |
# Append the search string to the api url | |
api_url_complete = API_URL + search_id | |
credentials = get_secret('jamf_slack_readonly') | |
api_request_response = requests.get(api_url_complete, headers=HEADERS, auth=(credentials["username"], credentials["password"])) | |
if api_request_response.status_code != 200: | |
response_text = "Can't locate *" + search_id + "* in the JSS :sadparrot:\nVerify that the serialnumber exists in the JSS and try again." | |
response_json = {} | |
response_json["text"] = response_text | |
else: | |
data_from_jamf = api_request_response.json() | |
parse_data(data_from_jamf, response_url) | |
return response_json | |
def parse_data(data,response_url): | |
response_json = {} | |
computer = data["computer"] | |
general_title = ":computer: *General Mac Info*\n" | |
name = computer["general"]["name"] | |
name_string = "> *Name:* " + name + "\n" | |
serial_number = computer["general"]["serial_number"] | |
serial_number_string = "> *Serial Number:* " + serial_number + "\n" | |
model = computer["hardware"]["model"] | |
model_string = "> *Model:* " + model + "\n" | |
os_version = computer["hardware"]["os_version"] | |
os_version_string = "> *macOS version: * " + str(os_version) + "\n" | |
username = computer["location"]["username"] | |
username_string = "> *Primary User:* " + username + "\n" | |
try: | |
warranty_date = computer["purchasing"]["warranty_expires"] | |
if warranty_date is "": | |
warranty_date_string = "> *Hardware warranty end date*: Unavailable" + "\n" | |
else: | |
warranty_date_datetime = datetime.strptime(warranty_date, '%Y-%m-%d') | |
warranty_date_formatted = warranty_date_datetime.strftime("%B %d, %Y") | |
warranty_date_string = "> *Hardware warranty end date*: " + warranty_date_formatted + "\n" | |
except: | |
warranty_date_string = "> *Hardware warranty end date*: information unavailable" + "\n" | |
hw_string = serial_number_string + model_string + warranty_date_string + os_version_string | |
now_epoch = time.time()*1000 | |
try: | |
last_contact_date_epoch = computer["general"]["last_contact_time_epoch"] | |
last_contact_date_utc = computer["general"]["last_contact_time_utc"] | |
last_contact_date_utc_datetime = dateutil.parser.parse(last_contact_date_utc) | |
last_contact_minutes = round((now_epoch - last_contact_date_epoch)//60000) | |
# | |
if last_contact_minutes > 15: | |
emoji = sad_emoji | |
last_contact_minutes_string = str(last_contact_minutes) + " minutes ago." | |
else: | |
emoji = glad_emoji | |
if last_contact_minutes < 1: | |
last_contact_minutes_string = " Less than one minute ago." | |
elif last_contact_minutes < 2: | |
last_contact_minutes_string = " About a minute ago." | |
else: | |
last_contact_minutes_string = str(last_contact_minutes) + " minutes ago." | |
last_contact_date_string = last_contact_date_utc_datetime.strftime("%Y-%m-%d %H:%M:%S") | |
last_contact_summary = ":clock330: *Last Check-in* (GMT+10)\n" + last_contact_date_string + "\n>" + emoji + " " + last_contact_minutes_string + "\n" | |
except: | |
emoji = sad_emoji | |
last_contact_summary = ":clock330: *Last Check-in*\n>" + emoji + " Not recorded\n" | |
try: | |
last_recon_date_epoch = computer["general"]["report_date_epoch"] | |
last_recon_date_utc = computer["general"]["report_date_utc"] | |
last_recon_date_utc_datetime = dateutil.parser.parse(last_recon_date_utc) | |
last_recon_minutes = round((now_epoch - last_recon_date_epoch)//60000) | |
if last_recon_minutes > 1440: | |
emoji = sad_emoji | |
last_recon_days = last_recon_minutes//1440 | |
last_recon_minutes_string = str(last_recon_days) + " day/s ago." | |
else: | |
emoji = glad_emoji | |
if last_recon_minutes < 1: | |
last_recon_minutes_string = " Less than one minute ago." | |
elif last_recon_minutes < 2: | |
last_recon_minutes_string = " About a minute ago." | |
else: | |
last_recon_minutes_string = str(last_recon_minutes) + " minutes ago." | |
last_recon_date_string = last_recon_date_utc_datetime.strftime("%Y-%m-%d %H:%M:%S") | |
last_recon_summary = ":clock330: *Last Inventory Update* (GMT+10)\n" + last_recon_date_string + "\n>" + emoji + " " + last_recon_minutes_string + "\n" | |
except: | |
emoji = sad_emoji | |
last_recon_summary = ":clock330: *Last Inventory Update*\n>" + emoji + " Not recorded\n" | |
computer_summary = general_title + name_string + username_string + hw_string + "\n" + last_recon_summary + last_contact_summary + "\n" | |
# Look for any groups that this computer is a member of that has a warning emoji in the group name | |
# Once found, append it to the group_string variable, for display in health summary | |
group_accounts = computer["groups_accounts"]["computer_group_memberships"] | |
group_string = "" | |
for group in group_accounts: | |
if "⚠️" in group: | |
group = group.strip().replace("⚠️","⚠️ Member of *") | |
group_string = group_string + group + "*\n" | |
# Look through all the extension attributes for this computer and look for any that | |
# have the word 'slack -' in the name | |
# once found, append it to the ea_string variable, for display in health summary | |
ext_attributes = computer["extension_attributes"] | |
ea_string = "" | |
for ea in ext_attributes: | |
ea_name = ea["name"] | |
if "slack - " in ea_name: | |
ea_name = ea_name.replace("slack - ","") | |
ea_name = ea_name.replace("$computername", name) | |
ea_value_formatted = ea["value"].replace(".",'\u034F.') | |
ea_value_formatted = ea_value_formatted.strip().replace("\n","\n>") | |
ea_string = ea_string + " *" + ea_name + "*\n>" + ea_value_formatted + "\n\n" | |
response_json['text'] = computer_summary + ea_string + group_string | |
# Setting replace_original to False makes sure that this reponse text does not replace the button attachment | |
# Leaving it as default, True | |
#response_json['replace_original'] = False | |
response = requests.post( | |
response_url, data=json.dumps(response_json), | |
headers={'Content-Type': 'application/json'} | |
) | |
def get_secret(secret_name): | |
#This function will grab the username and password from the secure creds store | |
session = boto3.session.Session() | |
client = session.client( | |
service_name='secretsmanager', | |
region_name=SM_REGION_NAME, | |
endpoint_url=SM_ENDPOINT_URL | |
) | |
try: | |
get_secret_value_response = client.get_secret_value( | |
SecretId=secret_name | |
) | |
except ClientError as e: | |
if e.response['Error']['Code'] == 'ResourceNotFoundException': | |
print("The requested secret " + secret_name + " was not found") | |
elif e.response['Error']['Code'] == 'InvalidRequestException': | |
print("The request was invalid due to:", e) | |
elif e.response['Error']['Code'] == 'InvalidParameterException': | |
print("The request had invalid params:", e) | |
else: | |
# Decrypted secret using the associated KMS CMK | |
# Depending on whether the secret was a string or binary, one of these fields will be populated | |
if 'SecretString' in get_secret_value_response: | |
secret = json.loads(get_secret_value_response['SecretString']) | |
return secret | |
else: | |
binary_secret_data = get_secret_value_response['SecretBinary'] | |
return binary_secret_data |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment