Skip to content

Instantly share code, notes, and snippets.

@mnanchev
Last active December 16, 2021 10:16
Show Gist options
  • Save mnanchev/8b539c46be3b64826640a173dacf1b98 to your computer and use it in GitHub Desktop.
Save mnanchev/8b539c46be3b64826640a173dacf1b98 to your computer and use it in GitHub Desktop.
import os
import json
import boto3
class QueueMessageSender:
def __init__(self) -> None:
self.s3_client = boto3.client("s3")
self.sqs_client = boto3.client("sqs")
def send_message_to_queue(self, queue_url, message):
try:
response = self.sqs_client.send_message(
QueueUrl=queue_url, MessageBody=json.dumps(message)
)
return response
except self.sqs_client.exceptions.InvalidMessageContents:
print("Invalid Message Contents")
return "Error the content of the message is invalid"
except self.sqs_client.exceptions.UnsupportedOperation:
print("Unsupported Operation")
return "unsupported operation"
def build_message(self, event):
response = self.s3_client.get_object_tagging(
Bucket=event["Records"][0]["s3"]["bucket"]["name"],
Key=event["Records"][0]["s3"]["object"]["key"],
)
return {
"result": response["TagSet"][0]["Value"],
"bucket": event["Records"][0]["s3"]["bucket"]["name"],
"key": event["Records"][0]["s3"]["object"]["key"],
}
@staticmethod
def check_if_infected(result):
if result == "infected":
return True
return False
class ObjectCleaner:
def __init__(self) -> None:
self.s3_client = boto3.client("s3")
def delete_object(self, bucket, key):
try:
self.s3_client.delete_object(Bucket=bucket, Key=key)
return "Object Deleted"
except self.s3_client.exceptions.NoSuchKey:
print("No Such Key")
return "No Such Key"
except self.s3_client.exceptions.ServerError:
print("Server Error")
return "Server Error"
except self.s3_client.exceptions.ClientError:
print("Client Error")
return "Client Error"
SCANNER_PROFILE_RESULT_QUEUE_URL = os.environ["SCANNER_PROFILE_RESULT_QUEUE_URL"]
SCANNER_VIDEO_RESULT_QUEUE_URL = os.environ["SCANNER_PROFILE_RESULT_QUEUE_URL"]
QUEUE_MESSAGE = QueueMessageSender()
OBJECT_CLEANER = ObjectCleaner()
def scanning_event_handler(event, __):
message_json = QUEUE_MESSAGE.build_message(event)
response = {"statusCode": 200, "body": json.dumps("Object is not infected")}
if (
"cdn" in message_json["bucket"]
and QueueMessageSender.check_if_infected(message_json["result"]) is True
):
QUEUE_MESSAGE.send_message_to_queue(
SCANNER_VIDEO_RESULT_QUEUE_URL, message_json
)
OBJECT_CLEANER.delete_object(message_json["bucket"], message_json["key"])
response = {
"statusCode": 200,
"body": json.dumps(
"Video message sent to queue. Infected Object was deleted"
),
}
elif QueueMessageSender.check_if_infected(message_json["result"]) is True:
QUEUE_MESSAGE.send_message_to_queue(
SCANNER_PROFILE_RESULT_QUEUE_URL, message_json
)
OBJECT_CLEANER.delete_object(message_json["bucket"], message_json["key"])
response = {
"statusCode": 200,
"body": json.dumps(
"Profile message sent to queue. Infected Object was deleted"
),
}
print(response)
return response
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment