Skip to content

Instantly share code, notes, and snippets.

@corey-cole
Created April 14, 2024 21:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save corey-cole/d537b08e3a426f7884f305f845737514 to your computer and use it in GitHub Desktop.
Save corey-cole/d537b08e3a426f7884f305f845737514 to your computer and use it in GitHub Desktop.
Get the manifest of the latest S3 inventory for a given bucket/inventory config
import json
from datetime import datetime, timedelta, timezone
from pprint import pprint
from typing import Optional, Tuple, TYPE_CHECKING
import boto3
from botocore.exceptions import ClientError
if TYPE_CHECKING:
from mypy_boto3_s3.client import S3Client
else:
S3Client = object
client: S3Client = boto3.client('s3')
# If the bucket inventory is being stored cross-account, call this from the account holding the
# target bucket. Adding 'GetInventoryConfiguration' associated with arbitrary IAM accounts in your bucket policy
# will see fewer raised eyebrows than arbitrary cross-account S3 reads.
def get_bucket_inventory_configuration_manifest(bucket_name: str, inventory_id: str) -> Optional[Tuple[str, str]]:
"""Returns the S3 key
Args:
bucket_name (str): Name of bucket for whichwe want the latest inventory manifest.
inventory_id (str): The ID used to identify the inventory configuration.
Returns:
Optional[Tuple[str,str]]: Tuple of S3 bucket and key of the latest inventory manifest, None if no manifests were found.
Raises:
botocore.exceptions.ClientError: If the inventory ID is not valid for the given bucket (Error Code 'NoSuchConfiguration').
"""
# Allow this to raise an exception if the inventory ID is not valid for the given bucket
response = client.get_bucket_inventory_configuration(
Bucket=bucket_name,
Id=inventory_id
)
# If the configuration is disabled, then exit early
# It's possible that there is a recent inventory, but this is an executive decision to just bail
if not response['InventoryConfiguration']['IsEnabled']:
return None
# Convert inventory bucket ARN to a name
inventory_bucket_name = response['InventoryConfiguration']['Destination']['S3BucketDestination']['Bucket'].rsplit(':', 1)[1]
prefix = f"{bucket_name}/{inventory_id}/"
# If there is a prefix, then it needs to be prepended to the overall destination
if 'Prefix' in response['InventoryConfiguration']['Destination']['S3BucketDestination']:
prefix = f"{response['InventoryConfiguration']['Destination']['S3BucketDestination']['Prefix']}/{prefix}"
# Assume daily inventory
date_window = -1
# If the bucket inventory is stored weekly, go back 8 days
if response['InventoryConfiguration']['Schedule']['Frequency'] == 'Weekly':
date_window = -8
window_start = datetime.now(timezone.utc) + timedelta(days=date_window)
# We don't know the hour at which the inventory was delivered, so limit ourselves to a day
start_key_fragment = window_start.strftime('%Y-%m-%dT')
response = client.list_objects_v2(
Bucket=inventory_bucket_name,
Prefix=prefix,
StartAfter=f"{prefix}{start_key_fragment}"
)
# Sort by date descending
manifests = sorted(
[obj for obj in response['Contents'] if obj['Key'].endswith('manifest.json') and obj['LastModified'] >= window_start],
key=lambda x: x['LastModified'],
reverse=True,
)
# If the list is empty, then there were no deliveries in the time window
if not manifests:
return None
# Return the first manifest in the list
return inventory_bucket_name, manifests[0]['Key']
def main():
bucket_name = 'alb-access-logs-123456789012-us-east-1'
inventory_id = 'plover'
try:
manifest_bucket, manifest_key = get_bucket_inventory_configuration_manifest(bucket_name, inventory_id)
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchConfiguration':
print(f"No inventory configuration found for {bucket_name} and {inventory_id}")
return
raise e
print(f"Latest manifest available at s3://{manifest_bucket}/{manifest_key}")
response = client.get_object(
Bucket=manifest_bucket,
Key=manifest_key
)
latest_manifest = json.loads(response['Body'].read())
pprint(latest_manifest)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment