Created
July 1, 2024 19:11
-
-
Save myersCody/0c57dc2132a7a712c035ae1210c30d20 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import logging | |
import boto3 | |
from botocore.config import Config | |
from botocore.exceptions import EndpointConnectionError | |
from botocore.exceptions import ClientError | |
LOG = logging.getLogger(__name__) | |
S3_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY_ID") | |
S3_SECRET = os.getenv("AWS_SECRET_ACCESS_KEY") | |
S3_REGION = "us-east-1" | |
S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME") | |
S3_ENDPOINT = 'https://s3.us-east-1.amazonaws.com' | |
path_in_bucket = "2023/04/30/a678b047-f78e-4ad8-9fbe-1e3b73bd5a24" | |
def get_s3_resource(access_key, secret_key, region): # pragma: no cover | |
""" | |
Obtain the s3 session client | |
""" | |
config = Config(connect_timeout=60) | |
aws_session = boto3.Session( | |
aws_access_key_id=access_key, | |
aws_secret_access_key=secret_key, | |
region_name=region, | |
) | |
return aws_session.resource("s3", endpoint_url=S3_ENDPOINT, config=config) | |
def _get_s3_objects(s3_path): | |
s3_resource = get_s3_resource(S3_ACCESS_KEY, S3_SECRET, S3_REGION) | |
return s3_resource.Bucket(S3_BUCKET_NAME).objects.filter(Prefix=s3_path) | |
def get_s3_objects_not_matching_metadata( | |
request_id, s3_path, *, metadata_key, metadata_value_check, context=None | |
) -> list[str]: | |
if not s3_path: | |
return [] | |
if context is None: | |
context = {} | |
try: | |
# s3_client = boto3.client('s3') | |
s3_client = boto3.client('s3', | |
aws_access_key_id=S3_ACCESS_KEY, | |
aws_secret_access_key=S3_SECRET, | |
region_name=S3_REGION) | |
keys = [] | |
for obj_summary in _get_s3_objects(s3_path): | |
response = s3_client.head_object(Bucket=obj_summary.bucket_name, Key=obj_summary.key) | |
print("------") | |
print(obj_summary.key) | |
print(f"Metadata: {response['Metadata']}") | |
print("\n") | |
metadata_value = response['Metadata'].get(metadata_key) | |
if metadata_value != metadata_value_check: | |
keys.append(obj_summary.key) | |
return keys | |
except (EndpointConnectionError, ClientError) as err: | |
print("ERROR!") | |
return [] | |
def get_s3_objects_matching_metadata( | |
request_id, s3_path, *, metadata_key, metadata_value_check, context=None | |
) -> list[str]: | |
if not s3_path: | |
return [] | |
if context is None: | |
context = {} | |
try: | |
s3_client = boto3.client('s3', | |
aws_access_key_id=S3_ACCESS_KEY, | |
aws_secret_access_key=S3_SECRET, | |
region_name=S3_REGION) | |
keys = [] | |
for obj_summary in _get_s3_objects(s3_path): | |
response = s3_client.head_object(Bucket=obj_summary.bucket_name, Key=obj_summary.key) | |
print("------") | |
print(obj_summary.key) | |
print(f"Metadata: {response['Metadata']}") | |
print("\n") | |
metadata_value = response['Metadata'].get(metadata_key) | |
if metadata_value == metadata_value_check: | |
keys.append(obj_summary.key) | |
return keys | |
except (EndpointConnectionError, ClientError) as err: | |
print("ERROR!") | |
return [] | |
print("TEST NOT MATCHING") | |
results = get_s3_objects_not_matching_metadata( | |
1, | |
path_in_bucket, | |
metadata_key="test-key", | |
metadata_value_check='1' | |
) | |
print(results) | |
print("TEST MATCHING") | |
results = get_s3_objects_matching_metadata( | |
1, | |
path_in_bucket, | |
metadata_key="test-key", | |
metadata_value_check='1' | |
) | |
print(results) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment