Skip to content

Instantly share code, notes, and snippets.

@tschoonj
Last active December 1, 2022 22:22
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tschoonj/0ef6eed94271dac5685a99f084ea5ed4 to your computer and use it in GitHub Desktop.
Save tschoonj/0ef6eed94271dac5685a99f084ea5ed4 to your computer and use it in GitHub Desktop.
Monitor Ceph S3 buckets for new files from Python with boto3 and pika
import os
import boto3
from botocore.client import Config
import sys
import string
import random
import logging
import time
import json
import urllib
from threading import Thread
import tempfile
from pathlib import Path
import pika
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
log_fmt_long = logging.Formatter(
fmt='%(asctime)s %(name)s %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
log_handler_stream = logging.StreamHandler(sys.stdout)
log_handler_stream.setFormatter(log_fmt_long)
log_handler_stream.setLevel(logging.INFO)
logger.addHandler(log_handler_stream)
BUCKET_NAME = 'test-bucket-' + ''.join(random.choice(string.ascii_lowercase) for i in range(10))
TOPIC_NAME = 's3-bucket-monitor-' + ''.join(random.choice(string.ascii_lowercase) for i in range(10))
AMQP_URL = os.environ['AMQP_URL']
AMQP_EXCHANGE = os.environ['AMQP_EXCHANGE']
client_options = {
'aws_access_key_id': os.environ['AWS_ACCESS_KEY_ID'],
'aws_secret_access_key': os.environ['AWS_SECRET_ACCESS_KEY'],
'region_name': os.environ.get('AWS_REGION', ''),
'endpoint_url': os.environ.get('CEPH_ENDPOINT_URL'),
}
s3_client = boto3.client('s3', **client_options)
sns_client = boto3.client('sns', **client_options, config=Config(signature_version="s3"))
class BucketUploaderThread(Thread):
def __init__(self):
super().__init__()
self._should_exit: bool = False
@property
def should_exit(self):
return self._should_exit
@should_exit.setter
def should_exit(self, value: bool):
self._should_exit = value
def run(self):
counter = 0
with tempfile.TemporaryDirectory() as tempdir:
while True:
if self.should_exit:
return
our_file = Path(tempdir, f'test_{counter}')
counter += 1
our_file.write_text('ignore me')
try:
logger.info(f'Uploading {our_file.name}')
s3_client.upload_file(str(our_file), BUCKET_NAME, our_file.name)
except Exception:
logger.exception(f'Could not upload {our_file.name}')
# three seconds between uploads
time.sleep(3)
bucket_uploader_thread = BucketUploaderThread()
def cleanup():
# stop thread
logger.info('Stopping thread')
bucket_uploader_thread.should_exit = True
time.sleep(4)
# delete bucket and everything in it
try:
logger.info(f'Deleting bucket {BUCKET_NAME}')
paginator = s3_client.get_paginator('list_objects_v2')
page_iterator = paginator.paginate(Bucket=BUCKET_NAME)
for page in page_iterator:
if 'Contents' in page:
for content in page['Contents']:
key = content['Key']
s3_client.delete_object(Bucket=BUCKET_NAME, Key=content['Key'])
s3_client.delete_bucket(Bucket=BUCKET_NAME)
except Exception:
logger.exception('Could not delete bucket')
# delete topic
if 'topic_arn' in globals():
logger.info(f'Deleting topic {topic_arn}')
try:
sns_client.delete_topic(TopicArn=topic_arn)
except Exception:
logger.exception('Could not delete topic')
sys.exit(0)
# create a bucket
try:
logger.info(f'Creating bucket {BUCKET_NAME}')
if client_options['region_name'] == "":
s3_client.create_bucket(Bucket=BUCKET_NAME)
else:
location = {"LocationConstraint": client_options['region_name']}
s3_client.create_bucket(
Bucket=BUCKET_NAME,
CreateBucketConfiguration=location,
)
except Exception:
logger.exception('Could not create bucket')
cleanup()
# generate URL query with endpoint_args
endpoint_args = f"push-endpoint={AMQP_URL}&amqp-exchange={AMQP_EXCHANGE}&amqp-ack-level=broker"
# parse it properly
attributes = {
nvp[0]: nvp[1]
for nvp in urllib.parse.parse_qsl(
endpoint_args, keep_blank_values=True
)
}
try:
# configure the Pika AMQP client
connection_params = pika.connection.URLParameters(AMQP_URL)
# create the exchange
with pika.BlockingConnection(connection_params) as connection:
with connection.channel() as channel:
channel.exchange_declare(
exchange=AMQP_EXCHANGE,
exchange_type="topic",
durable=True,
)
# create a topic
resp = sns_client.create_topic(Name=TOPIC_NAME, Attributes=attributes)
topic_arn = resp["TopicArn"]
topic_conf_list = [
{
"TopicArn": topic_arn,
"Events": ["s3:ObjectCreated:*",],
"Id": "type-here-something-possibly-useful", # Id is mandatory!
},
]
s3_client.put_bucket_notification_configuration(
Bucket=BUCKET_NAME,
NotificationConfiguration={
"TopicConfigurations": topic_conf_list
},
)
except Exception:
logger.exception('Could not create topic')
cleanup()
time.sleep(1)
# start a thread that starts uploading files to the bucket
bucket_uploader_thread.start()
def on_message_cb(channel, method_frame, header_frame, _body):
if method_frame:
body = json.loads(_body)
channel.basic_ack(method_frame.delivery_tag)
# we are going to assume 1 record per message
try:
record = body["Records"][0]
event_name: str = record["eventName"]
except Exception as e:
logger.info(
f"Ignoring {_body=} because of {str(e)}"
)
return
if "ObjectCreated" in event_name:
# new file created!
s3_info = record["s3"]
object_info = s3_info['object']
key = urllib.parse.unquote_plus(object_info['key'])
logger.info(f'Found new object {key}')
with pika.BlockingConnection(connection_params) as connection:
with connection.channel() as channel:
result = channel.queue_declare("", exclusive=True)
queue_name = result.method.queue
channel.queue_bind(
exchange=AMQP_EXCHANGE,
queue=queue_name,
routing_key=TOPIC_NAME,
)
channel.basic_consume(queue_name, on_message_cb)
try:
channel.start_consuming()
except KeyboardInterrupt:
logger.info('Ctrl-C caught!')
channel.stop_consuming()
cleanup()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment