Created
November 27, 2019 04:18
-
-
Save taniacomputer/ef9d87ec5f0895058a1dfbd5f4d185bb to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# slackapp_initial_slash_command_invoked.py | |
# This function is invoked from a Slack slash command | |
# It verifies the request, responds to it, and invokes the SNS topic for the | |
# second lambda function, if required. | |
# Author: @taniacomputer | |
# Last modified: 20/11/2019 | |
import json | |
import hashlib | |
import hmac | |
import boto3 | |
from botocore.exceptions import ClientError | |
import base64 | |
from urllib import parse as urlparse | |
import time | |
helper_text = """ | |
:applemac: | |
`/jamf <search string>` | |
Jamf Bot uses the search string to look for any matching Macs in Jamf. | |
:female-detective::skin-tone-3: It searches for a match in each Mac's: | |
>computer name | |
>serial number | |
>IP address | |
>MAC address | |
>username | |
>user full name | |
>email address | |
You can add an asterisk `*` in your string as a search wildcard. | |
`/jamf tania*` | |
`/jamf user@email.com` | |
>returns any Macs assigned to a user with that email address | |
`/jamf tania*` | |
>returns any Macs assigned to a user with first name Tania | |
- There is a limit of 20 results per search string, so try and keep it specific. | |
- From the results, click the `More Info` button to view an extended summary for that particular Mac. | |
- In the extended summary, any items with a warning emoji :warning: requires further investigation and possible action. | |
:parrot: | |
""" | |
# Simple notification service ARN | |
SNS_TOPIC_ARN = 'arn:aws:sns:xx-yyyy-z:1234567890:slackapp_run_matchSearch' | |
# Secrets manager endpoint url and region name | |
SM_ENDPOINT_URL = "https://secretsmanager.xx-yyyy-z.amazonaws.com" | |
SM_REGION_NAME = "xx-yyyy-z" | |
SLACK_CHANNEL_ID = "ABCDEFG" | |
def lambda_handler(event, context): | |
request_body = event["body"] | |
request_body_parsed = dict(urlparse.parse_qsl(request_body)) | |
channel_id = request_body_parsed["channel_id"] | |
if (channel_id == SLACK_CHANNEL_ID): | |
delivered_signature = event["headers"]['X-Slack-Signature'] | |
slack_request_timestamp = event["headers"]['X-Slack-Request-Timestamp'] | |
# Create hmac signature and return string representation | |
slack_signing_secret = get_secret('jamf_slackapp_secret') | |
slack_signing_secret = bytes(slack_signing_secret["secret"], 'utf-8') | |
basestring = f"v0:{slack_request_timestamp}:{request_body}".encode('utf-8') | |
expected_signature = 'v0=' + hmac.new(slack_signing_secret, basestring, hashlib.sha256).hexdigest() | |
# confirm that message is not more than 5 minutes old | |
current_time = time.time() | |
slack_request_timestamp_asFloat = float(slack_request_timestamp) | |
if (current_time - slack_request_timestamp_asFloat) > 300: | |
response_text = "Message more than 5 minutes old" | |
response_code = 412 | |
# Confirm that delivered signature is the same as the expected_signature | |
elif hmac.compare_digest(expected_signature, delivered_signature): | |
try: | |
search_string = request_body_parsed["text"] | |
except KeyError: | |
# catches if no search string parameter is provided | |
search_string = "" | |
# hooray, signature strings match, the request came from Slack! | |
if search_string == "" or search_string == "help": | |
response_text = helper_text | |
response_code = 200 | |
else: | |
slack_ready_search_string = search_string.replace("*","\u034F*") | |
response_text = ":female-detective::skin-tone-3: looking up _" + slack_ready_search_string + "_..." | |
response_code = 200 | |
# Publish to the SNS topic | |
client = boto3.client('sns') | |
trigger = client.publish( | |
TargetArn = SNS_TOPIC_ARN, | |
Message=json.dumps({'default': json.dumps(request_body_parsed)}), | |
MessageStructure='json') | |
else: | |
response_text = "Message signature is invalid" | |
response_code = 412 | |
else: | |
response_text = ":warning: You must use `/jamf` while inside an authorized channel." | |
# Returning status code of 200 so that response text is presented to user | |
response_code = 200 | |
return { | |
'statusCode': response_code, | |
'body': response_text | |
} | |
def get_secret(secret_name): | |
#This function will grab the username and password from the secure creds store | |
session = boto3.session.Session() | |
client = session.client( | |
service_name='secretsmanager', | |
region_name=SM_REGION_NAME, | |
endpoint_url=SM_ENDPOINT_URL | |
) | |
try: | |
get_secret_value_response = client.get_secret_value( | |
SecretId=secret_name | |
) | |
except ClientError as e: | |
if e.response['Error']['Code'] == 'ResourceNotFoundException': | |
print("The requested secret " + secret_name + " was not found") | |
elif e.response['Error']['Code'] == 'InvalidRequestException': | |
print("The request was invalid due to:", e) | |
elif e.response['Error']['Code'] == 'InvalidParameterException': | |
print("The request had invalid params:", e) | |
else: | |
# Decrypted secret using the associated KMS CMK | |
# Depending on whether the secret was a string or binary, one of these fields will be populated | |
if 'SecretString' in get_secret_value_response: | |
secret = json.loads(get_secret_value_response['SecretString']) | |
return secret | |
else: | |
binary_secret_data = get_secret_value_response['SecretBinary'] | |
return binary_secret_data |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment