Skip to content

Instantly share code, notes, and snippets.

@aviat
Created July 27, 2020 11:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aviat/3474ae19b7135094ddf73522d324b66c to your computer and use it in GitHub Desktop.
Save aviat/3474ae19b7135094ddf73522d324b66c to your computer and use it in GitHub Desktop.
import boto3
# https://docs.aws.amazon.com/firehose/latest/dev/writing-with-sdk.html
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/firehose.html#Firehose.Client.put_record
# https://docs.aws.amazon.com/lambda/latest/dg/services-alb.html
import base64
from collections import namedtuple
import hashlib
import hmac
import json
import os
def _answer(code, message):
return {
'statusCode': code,
'message': json.dumps(message),
}
def populate_clients():
value = os.environ.get('SQREEN_CLIENTS')
if value is None:
print("Error: no SQREEN_CLIENTS environment variable found")
return
for (k, v) in json.loads(value).items():
CLIENTS[k] = ClientConfiguration(**v)
def lambda_handler(event, context):
message = None
code = 500
body = event['body']
integrity = event['headers'].get('x-sqreen-integrity')
uuid = event['queryStringParameters'].get('uuid')
if uuid is None:
return _answer(500, "No client UUID provided")
client = CLIENTS.get(uuid)
if client is None:
return _answer(404, "Client UUID not found")
is_signature_valid = check_signature(client.webhook_secret_key, integrity, body)
if not is_signature_valid:
return _answer(500, "Invalid signature")
try:
webhook_body = json.loads(body)
data = {
"integrity": integrity,
"body": webhook_body,
}
# Separate entries with a new line in the destination file
json_data = json.dumps(data) + "\n"
res = aws_client.put_record(DeliveryStreamName=client.kinesis_stream,
Record={
'Data': json_data
}
)
message = 'written %d bytes' % len(json_data)
code = 200
except Exception as e:
message = 'error: %s' % e
return _answer(code, message)
def check_signature(secret_key, request_signature, request_body):
hasher = hmac.new(bytes(secret_key, "utf-8"), bytes(request_body, "utf-8"), hashlib.sha256)
dig = hasher.hexdigest()
return hmac.compare_digest(dig, request_signature)
ClientConfiguration = namedtuple('ClientConfiguration',
[
'kinesis_stream',
'webhook_secret_key',
'name',
'organization_id',
],
defaults=[None, None] # name and organization_id can be empty
)
def add_client(uuid, kinesis_stream, webhook_secret_key, name, organization_id):
if uuid in CLIENTS:
raise Exception("uuid already exists")
CLIENTS[uuid] = ClientConfiguration(
kinesis_stream = kinesis_stream,
webhook_secret_key = webhook_secret_key,
name = name,
organization_id = organization_id,
)
res = {}
for (k, v) in CLIENTS.items():
res[k] = dict(v._asdict())
return json.dumps(res)
aws_client = boto3.client('firehose')
CLIENTS = {}
populate_clients()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment