Skip to content

Instantly share code, notes, and snippets.

@microft
Created April 16, 2024 16:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save microft/48763a55d90620dc0d7526ac47e24cbe to your computer and use it in GitHub Desktop.
Save microft/48763a55d90620dc0d7526ac47e24cbe to your computer and use it in GitHub Desktop.
import json
import sys
import boto3
def create_empty_file(bucket_name, key):
s3 = boto3.client('s3')
s3.put_object(Bucket=bucket_name, Key=key, Body=b'')
def revert_to_previous_version(bucket_name, key):
s3 = boto3.client('s3')
# Get the current version of the object
response = s3.head_object(Bucket=bucket_name, Key=key)
current_version_id = response['VersionId']
# List all versions of the object
versions = s3.list_object_versions(Bucket=bucket_name, Prefix=key)['Versions']
# Find the previous version (excluding the current one)
previous_version = None
for version in versions:
if version['VersionId'] != current_version_id:
previous_version = version
break
# Revert to the previous version
if previous_version:
s3.copy_object(
Bucket=bucket_name,
CopySource={'Bucket': bucket_name, 'Key': key, 'VersionId': previous_version['VersionId']},
Key=key
)
print(f"Object '{key}' reverted to version '{previous_version['VersionId']}'")
else:
print(f"No previous version found for object '{key}'")
def read_ndjson(file_path):
# List to store parsed JSON objects
json_objects = []
# Open the ndjson file for reading
with open(file_path, 'r', encoding='utf-8') as file:
# Iterate through each line in the file
for line in file:
# Parse each line as JSON and append to the list
#json_objects.append(json.loads(line.strip()))
email = json.loads(line.strip())
for att in email.get("attachments", []):
size = int(att.get("fileInfo", {}).get("contentLength", 0))
key = att.get("fileInfo", {}).get('location', {})['key']
bucket = att.get("fileInfo", {}).get('location', {})['bucket']
if size > 40000000:
#print(size, bucket, key)
#create_empty_file(bucket, key)
revert_to_previous_version(bucket, key)
return json_objects
def main():
# Path to the ndjson file
file_path = sys.argv[1]
# Call the function to read ndjson file
json_objects = read_ndjson(file_path)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment