Last active
December 1, 2022 22:22
-
-
Save tschoonj/0ef6eed94271dac5685a99f084ea5ed4 to your computer and use it in GitHub Desktop.
Monitor Ceph S3 buckets for new files from Python with boto3 and pika
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import boto3 | |
from botocore.client import Config | |
import sys | |
import string | |
import random | |
import logging | |
import time | |
import json | |
import urllib | |
from threading import Thread | |
import tempfile | |
from pathlib import Path | |
import pika | |
logger = logging.getLogger(__name__) | |
logger.setLevel(logging.INFO) | |
log_fmt_long = logging.Formatter( | |
fmt='%(asctime)s %(name)s %(levelname)s: %(message)s', | |
datefmt='%Y-%m-%d %H:%M:%S' | |
) | |
log_handler_stream = logging.StreamHandler(sys.stdout) | |
log_handler_stream.setFormatter(log_fmt_long) | |
log_handler_stream.setLevel(logging.INFO) | |
logger.addHandler(log_handler_stream) | |
BUCKET_NAME = 'test-bucket-' + ''.join(random.choice(string.ascii_lowercase) for i in range(10)) | |
TOPIC_NAME = 's3-bucket-monitor-' + ''.join(random.choice(string.ascii_lowercase) for i in range(10)) | |
AMQP_URL = os.environ['AMQP_URL'] | |
AMQP_EXCHANGE = os.environ['AMQP_EXCHANGE'] | |
client_options = { | |
'aws_access_key_id': os.environ['AWS_ACCESS_KEY_ID'], | |
'aws_secret_access_key': os.environ['AWS_SECRET_ACCESS_KEY'], | |
'region_name': os.environ.get('AWS_REGION', ''), | |
'endpoint_url': os.environ.get('CEPH_ENDPOINT_URL'), | |
} | |
s3_client = boto3.client('s3', **client_options) | |
sns_client = boto3.client('sns', **client_options, config=Config(signature_version="s3")) | |
class BucketUploaderThread(Thread): | |
def __init__(self): | |
super().__init__() | |
self._should_exit: bool = False | |
@property | |
def should_exit(self): | |
return self._should_exit | |
@should_exit.setter | |
def should_exit(self, value: bool): | |
self._should_exit = value | |
def run(self): | |
counter = 0 | |
with tempfile.TemporaryDirectory() as tempdir: | |
while True: | |
if self.should_exit: | |
return | |
our_file = Path(tempdir, f'test_{counter}') | |
counter += 1 | |
our_file.write_text('ignore me') | |
try: | |
logger.info(f'Uploading {our_file.name}') | |
s3_client.upload_file(str(our_file), BUCKET_NAME, our_file.name) | |
except Exception: | |
logger.exception(f'Could not upload {our_file.name}') | |
# three seconds between uploads | |
time.sleep(3) | |
bucket_uploader_thread = BucketUploaderThread() | |
def cleanup(): | |
# stop thread | |
logger.info('Stopping thread') | |
bucket_uploader_thread.should_exit = True | |
time.sleep(4) | |
# delete bucket and everything in it | |
try: | |
logger.info(f'Deleting bucket {BUCKET_NAME}') | |
paginator = s3_client.get_paginator('list_objects_v2') | |
page_iterator = paginator.paginate(Bucket=BUCKET_NAME) | |
for page in page_iterator: | |
if 'Contents' in page: | |
for content in page['Contents']: | |
key = content['Key'] | |
s3_client.delete_object(Bucket=BUCKET_NAME, Key=content['Key']) | |
s3_client.delete_bucket(Bucket=BUCKET_NAME) | |
except Exception: | |
logger.exception('Could not delete bucket') | |
# delete topic | |
if 'topic_arn' in globals(): | |
logger.info(f'Deleting topic {topic_arn}') | |
try: | |
sns_client.delete_topic(TopicArn=topic_arn) | |
except Exception: | |
logger.exception('Could not delete topic') | |
sys.exit(0) | |
# create a bucket | |
try: | |
logger.info(f'Creating bucket {BUCKET_NAME}') | |
if client_options['region_name'] == "": | |
s3_client.create_bucket(Bucket=BUCKET_NAME) | |
else: | |
location = {"LocationConstraint": client_options['region_name']} | |
s3_client.create_bucket( | |
Bucket=BUCKET_NAME, | |
CreateBucketConfiguration=location, | |
) | |
except Exception: | |
logger.exception('Could not create bucket') | |
cleanup() | |
# generate URL query with endpoint_args | |
endpoint_args = f"push-endpoint={AMQP_URL}&amqp-exchange={AMQP_EXCHANGE}&amqp-ack-level=broker" | |
# parse it properly | |
attributes = { | |
nvp[0]: nvp[1] | |
for nvp in urllib.parse.parse_qsl( | |
endpoint_args, keep_blank_values=True | |
) | |
} | |
try: | |
# configure the Pika AMQP client | |
connection_params = pika.connection.URLParameters(AMQP_URL) | |
# create the exchange | |
with pika.BlockingConnection(connection_params) as connection: | |
with connection.channel() as channel: | |
channel.exchange_declare( | |
exchange=AMQP_EXCHANGE, | |
exchange_type="topic", | |
durable=True, | |
) | |
# create a topic | |
resp = sns_client.create_topic(Name=TOPIC_NAME, Attributes=attributes) | |
topic_arn = resp["TopicArn"] | |
topic_conf_list = [ | |
{ | |
"TopicArn": topic_arn, | |
"Events": ["s3:ObjectCreated:*",], | |
"Id": "type-here-something-possibly-useful", # Id is mandatory! | |
}, | |
] | |
s3_client.put_bucket_notification_configuration( | |
Bucket=BUCKET_NAME, | |
NotificationConfiguration={ | |
"TopicConfigurations": topic_conf_list | |
}, | |
) | |
except Exception: | |
logger.exception('Could not create topic') | |
cleanup() | |
time.sleep(1) | |
# start a thread that starts uploading files to the bucket | |
bucket_uploader_thread.start() | |
def on_message_cb(channel, method_frame, header_frame, _body): | |
if method_frame: | |
body = json.loads(_body) | |
channel.basic_ack(method_frame.delivery_tag) | |
# we are going to assume 1 record per message | |
try: | |
record = body["Records"][0] | |
event_name: str = record["eventName"] | |
except Exception as e: | |
logger.info( | |
f"Ignoring {_body=} because of {str(e)}" | |
) | |
return | |
if "ObjectCreated" in event_name: | |
# new file created! | |
s3_info = record["s3"] | |
object_info = s3_info['object'] | |
key = urllib.parse.unquote_plus(object_info['key']) | |
logger.info(f'Found new object {key}') | |
with pika.BlockingConnection(connection_params) as connection: | |
with connection.channel() as channel: | |
result = channel.queue_declare("", exclusive=True) | |
queue_name = result.method.queue | |
channel.queue_bind( | |
exchange=AMQP_EXCHANGE, | |
queue=queue_name, | |
routing_key=TOPIC_NAME, | |
) | |
channel.basic_consume(queue_name, on_message_cb) | |
try: | |
channel.start_consuming() | |
except KeyboardInterrupt: | |
logger.info('Ctrl-C caught!') | |
channel.stop_consuming() | |
cleanup() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment