Skip to content

Instantly share code, notes, and snippets.

@myersCody
Created July 1, 2024 19:11
Show Gist options
  • Save myersCody/0c57dc2132a7a712c035ae1210c30d20 to your computer and use it in GitHub Desktop.
Save myersCody/0c57dc2132a7a712c035ae1210c30d20 to your computer and use it in GitHub Desktop.
import os
import logging
import boto3
from botocore.config import Config
from botocore.exceptions import EndpointConnectionError
from botocore.exceptions import ClientError
LOG = logging.getLogger(__name__)
S3_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY_ID")
S3_SECRET = os.getenv("AWS_SECRET_ACCESS_KEY")
S3_REGION = "us-east-1"
S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME")
S3_ENDPOINT = 'https://s3.us-east-1.amazonaws.com'
path_in_bucket = "2023/04/30/a678b047-f78e-4ad8-9fbe-1e3b73bd5a24"
def get_s3_resource(access_key, secret_key, region): # pragma: no cover
"""
Obtain the s3 session client
"""
config = Config(connect_timeout=60)
aws_session = boto3.Session(
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
region_name=region,
)
return aws_session.resource("s3", endpoint_url=S3_ENDPOINT, config=config)
def _get_s3_objects(s3_path):
s3_resource = get_s3_resource(S3_ACCESS_KEY, S3_SECRET, S3_REGION)
return s3_resource.Bucket(S3_BUCKET_NAME).objects.filter(Prefix=s3_path)
def get_s3_objects_not_matching_metadata(
request_id, s3_path, *, metadata_key, metadata_value_check, context=None
) -> list[str]:
if not s3_path:
return []
if context is None:
context = {}
try:
# s3_client = boto3.client('s3')
s3_client = boto3.client('s3',
aws_access_key_id=S3_ACCESS_KEY,
aws_secret_access_key=S3_SECRET,
region_name=S3_REGION)
keys = []
for obj_summary in _get_s3_objects(s3_path):
response = s3_client.head_object(Bucket=obj_summary.bucket_name, Key=obj_summary.key)
print("------")
print(obj_summary.key)
print(f"Metadata: {response['Metadata']}")
print("\n")
metadata_value = response['Metadata'].get(metadata_key)
if metadata_value != metadata_value_check:
keys.append(obj_summary.key)
return keys
except (EndpointConnectionError, ClientError) as err:
print("ERROR!")
return []
def get_s3_objects_matching_metadata(
request_id, s3_path, *, metadata_key, metadata_value_check, context=None
) -> list[str]:
if not s3_path:
return []
if context is None:
context = {}
try:
s3_client = boto3.client('s3',
aws_access_key_id=S3_ACCESS_KEY,
aws_secret_access_key=S3_SECRET,
region_name=S3_REGION)
keys = []
for obj_summary in _get_s3_objects(s3_path):
response = s3_client.head_object(Bucket=obj_summary.bucket_name, Key=obj_summary.key)
print("------")
print(obj_summary.key)
print(f"Metadata: {response['Metadata']}")
print("\n")
metadata_value = response['Metadata'].get(metadata_key)
if metadata_value == metadata_value_check:
keys.append(obj_summary.key)
return keys
except (EndpointConnectionError, ClientError) as err:
print("ERROR!")
return []
print("TEST NOT MATCHING")
results = get_s3_objects_not_matching_metadata(
1,
path_in_bucket,
metadata_key="test-key",
metadata_value_check='1'
)
print(results)
print("TEST MATCHING")
results = get_s3_objects_matching_metadata(
1,
path_in_bucket,
metadata_key="test-key",
metadata_value_check='1'
)
print(results)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment