Skip to content

Instantly share code, notes, and snippets.

@taniacomputer
Created November 27, 2019 04:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save taniacomputer/deece78ad7dd408d4d367245fcc08bc8 to your computer and use it in GitHub Desktop.
Save taniacomputer/deece78ad7dd408d4d367245fcc08bc8 to your computer and use it in GitHub Desktop.
# slackapp_run_matchSearch.py
# This function is invoked by an SNS topic
# and runs a Jamf Pro equivalent of an advanced search
# formatting the results and sending them back to Slack
# Author: @taniacomputer
# Last modified: 20/11/2019
import json
from botocore.vendored import requests
import boto3
import base64
from botocore.exceptions import ClientError
import time
# Jamf Pro API URL
API_URL = "https://jamf.com/JSSResource/computers/match/"
# Secrets manager endpoint url and region name
SM_ENDPOINT_URL = "https://secretsmanager.xx-yyyy-z.amazonaws.com"
SM_REGION_NAME = "xx-yyyy-z"
SLACK_CHANNEL_ID = "ABCDEFG"
HEADERS = {
'Accept': 'application/json',
'Content-type': 'application/json',
}
def lambda_handler(event, context):
data_from_slack = event["Records"][0]["Sns"]["Message"]
json_loaded_data = json.loads(data_from_slack)
response_url = json_loaded_data["response_url"]
channel_id = json_loaded_data["channel_id"]
search_string = json_loaded_data["text"]
search_handler(channel_id, search_string, response_url)
def search_handler(channel_id, search_string, response_url):
response_json = {}
# Append the search string to the api url
api_url_complete = API_URL + search_string
# Verify that the request is coming from the right slack channel
if (channel_id == SLACK_CHANNEL_ID):
# Get Jamf Pro read only credentials
credentials = get_secret('jamf_slack_readonly')
api_request_response = requests.get(api_url_complete, headers=HEADERS, auth=(credentials["username"], credentials["password"]))
if api_request_response.status_code != 200:
response_text = "Can't contact the Jamf server :disappointed:\nVerify that its up and running, and try again."
response_json["text"] = response_text
response = requests.post(
response_url, data=json.dumps(response_json),
headers={'Content-Type': 'application/json'}
)
else:
data_from_jamf = api_request_response.json()
parse_data(data_from_jamf,search_string,response_url)
else:
response_text = ":warning: You must use `/jamf` while inside an authorized channel."
response_json["text"] = ":warning: `/jamf` can only be used from an authorized channel."
response = requests.post(
response_url, data=json.dumps(response_json),
headers={'Content-Type': 'application/json'}
)
def parse_data(data, search_string, url):
response_json = {}
size = len(data["computers"])
# If there are more than 20 results then ask user to refine their search string
if size > 20:
response_json["text"] = ":warning: More than 20 results. Please refine your search string"
response = requests.post(
url, data=json.dumps(response_json),
headers={'Content-Type': 'application/json'}
)
elif size == 0: # no results found
response_json["text"] = ":sadparrot: No results found for " + search_string
response = requests.post(
url, data=json.dumps(response_json),
headers={'Content-Type': 'application/json'}
)
else:
response_json["text"] = "Total Results: *" + str(size) + "*\n"
response = requests.post(
url, data=json.dumps(response_json),
headers={'Content-Type': 'application/json'}
)
computers = data["computers"]
attachments = []
for computer in computers:
name = computer["name"]
name_string = ":computer: *" + name + "*\n"
serial_number = computer["serial_number"]
serial_number_string = "*Serial Number:* " + serial_number + "\n"
jss_id = computer["id"]
username = computer["username"]
username_string = "*Primary User:* " + username + "\n"
computer_summary = name_string + username_string + serial_number_string
attachment = {
'callback_id': "id_search",
'text': computer_summary,
'actions' : [{
'name': 'moreinfo',
'text': 'More Info',
'type': "button",
'value': str(jss_id)
}]
}
attachments.append(attachment)
response_json = {
'attachments' : attachments
}
response = requests.post(
url, data=json.dumps(response_json),
headers={'Content-Type': 'application/json'}
)
def get_secret(secret_name):
'''This function will grab the username and password from the secure creds store'''
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name=SM_REGION_NAME,
endpoint_url=SM_ENDPOINT_URL
)
try:
get_secret_value_response = client.get_secret_value(
SecretId=secret_name
)
except ClientError as e:
if e.response['Error']['Code'] == 'ResourceNotFoundException':
print("The requested secret " + secret_name + " was not found")
elif e.response['Error']['Code'] == 'InvalidRequestException':
print("The request was invalid due to:", e)
elif e.response['Error']['Code'] == 'InvalidParameterException':
print("The request had invalid params:", e)
else:
# Decrypted secret using the associated KMS CMK
# Depending on whether the secret was a string or binary, one of these fields will be populated
if 'SecretString' in get_secret_value_response:
secret = json.loads(get_secret_value_response['SecretString'])
return secret
else:
binary_secret_data = get_secret_value_response['SecretBinary']
return binary_secret_data
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment