Created
November 27, 2019 04:20
-
-
Save taniacomputer/deece78ad7dd408d4d367245fcc08bc8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# slackapp_run_matchSearch.py | |
# This function is invoked by an SNS topic | |
# and runs a Jamf Pro equivalent of an advanced search | |
# formatting the results and sending them back to Slack | |
# Author: @taniacomputer | |
# Last modified: 20/11/2019 | |
import json | |
from botocore.vendored import requests | |
import boto3 | |
import base64 | |
from botocore.exceptions import ClientError | |
import time | |
# Jamf Pro API URL | |
API_URL = "https://jamf.com/JSSResource/computers/match/" | |
# Secrets manager endpoint url and region name | |
SM_ENDPOINT_URL = "https://secretsmanager.xx-yyyy-z.amazonaws.com" | |
SM_REGION_NAME = "xx-yyyy-z" | |
SLACK_CHANNEL_ID = "ABCDEFG" | |
HEADERS = { | |
'Accept': 'application/json', | |
'Content-type': 'application/json', | |
} | |
def lambda_handler(event, context): | |
data_from_slack = event["Records"][0]["Sns"]["Message"] | |
json_loaded_data = json.loads(data_from_slack) | |
response_url = json_loaded_data["response_url"] | |
channel_id = json_loaded_data["channel_id"] | |
search_string = json_loaded_data["text"] | |
search_handler(channel_id, search_string, response_url) | |
def search_handler(channel_id, search_string, response_url): | |
response_json = {} | |
# Append the search string to the api url | |
api_url_complete = API_URL + search_string | |
# Verify that the request is coming from the right slack channel | |
if (channel_id == SLACK_CHANNEL_ID): | |
# Get Jamf Pro read only credentials | |
credentials = get_secret('jamf_slack_readonly') | |
api_request_response = requests.get(api_url_complete, headers=HEADERS, auth=(credentials["username"], credentials["password"])) | |
if api_request_response.status_code != 200: | |
response_text = "Can't contact the Jamf server :disappointed:\nVerify that its up and running, and try again." | |
response_json["text"] = response_text | |
response = requests.post( | |
response_url, data=json.dumps(response_json), | |
headers={'Content-Type': 'application/json'} | |
) | |
else: | |
data_from_jamf = api_request_response.json() | |
parse_data(data_from_jamf,search_string,response_url) | |
else: | |
response_text = ":warning: You must use `/jamf` while inside an authorized channel." | |
response_json["text"] = ":warning: `/jamf` can only be used from an authorized channel." | |
response = requests.post( | |
response_url, data=json.dumps(response_json), | |
headers={'Content-Type': 'application/json'} | |
) | |
def parse_data(data, search_string, url): | |
response_json = {} | |
size = len(data["computers"]) | |
# If there are more than 20 results then ask user to refine their search string | |
if size > 20: | |
response_json["text"] = ":warning: More than 20 results. Please refine your search string" | |
response = requests.post( | |
url, data=json.dumps(response_json), | |
headers={'Content-Type': 'application/json'} | |
) | |
elif size == 0: # no results found | |
response_json["text"] = ":sadparrot: No results found for " + search_string | |
response = requests.post( | |
url, data=json.dumps(response_json), | |
headers={'Content-Type': 'application/json'} | |
) | |
else: | |
response_json["text"] = "Total Results: *" + str(size) + "*\n" | |
response = requests.post( | |
url, data=json.dumps(response_json), | |
headers={'Content-Type': 'application/json'} | |
) | |
computers = data["computers"] | |
attachments = [] | |
for computer in computers: | |
name = computer["name"] | |
name_string = ":computer: *" + name + "*\n" | |
serial_number = computer["serial_number"] | |
serial_number_string = "*Serial Number:* " + serial_number + "\n" | |
jss_id = computer["id"] | |
username = computer["username"] | |
username_string = "*Primary User:* " + username + "\n" | |
computer_summary = name_string + username_string + serial_number_string | |
attachment = { | |
'callback_id': "id_search", | |
'text': computer_summary, | |
'actions' : [{ | |
'name': 'moreinfo', | |
'text': 'More Info', | |
'type': "button", | |
'value': str(jss_id) | |
}] | |
} | |
attachments.append(attachment) | |
response_json = { | |
'attachments' : attachments | |
} | |
response = requests.post( | |
url, data=json.dumps(response_json), | |
headers={'Content-Type': 'application/json'} | |
) | |
def get_secret(secret_name): | |
'''This function will grab the username and password from the secure creds store''' | |
session = boto3.session.Session() | |
client = session.client( | |
service_name='secretsmanager', | |
region_name=SM_REGION_NAME, | |
endpoint_url=SM_ENDPOINT_URL | |
) | |
try: | |
get_secret_value_response = client.get_secret_value( | |
SecretId=secret_name | |
) | |
except ClientError as e: | |
if e.response['Error']['Code'] == 'ResourceNotFoundException': | |
print("The requested secret " + secret_name + " was not found") | |
elif e.response['Error']['Code'] == 'InvalidRequestException': | |
print("The request was invalid due to:", e) | |
elif e.response['Error']['Code'] == 'InvalidParameterException': | |
print("The request had invalid params:", e) | |
else: | |
# Decrypted secret using the associated KMS CMK | |
# Depending on whether the secret was a string or binary, one of these fields will be populated | |
if 'SecretString' in get_secret_value_response: | |
secret = json.loads(get_secret_value_response['SecretString']) | |
return secret | |
else: | |
binary_secret_data = get_secret_value_response['SecretBinary'] | |
return binary_secret_data |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment