Skip to content

Instantly share code, notes, and snippets.

@recalde
Last active March 4, 2024 17:14
Show Gist options
  • Save recalde/620bc44f7d2a17c82682edab987267bc to your computer and use it in GitHub Desktop.
Save recalde/620bc44f7d2a17c82682edab987267bc to your computer and use it in GitHub Desktop.
import os
import csv
import boto3
from concurrent.futures import ThreadPoolExecutor, as_completed
from FjaPb import FjaPb # Assuming this is the correct import for FjaPb
# Load AWS credentials and target bucket details from environment variables
ACCESS_KEY = os.getenv("AWS_ACCESS_KEY_ID")
SECRET_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
BUCKET_URL = os.getenv("S3_BUCKET_URL")
PREFIX = 'your_prefix_here' # This should be replaced with your actual prefix
# Initialize a boto3 session
session = boto3.Session(
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY,
)
s3 = session.resource('s3')
bucket = s3.Bucket(BUCKET_URL)
# File to keep track of processed files
PROCESSED_FILES_RECORD = f"{PREFIX}_processed_files.txt"
# Function to read processed files from record
def read_processed_files():
if os.path.exists(PROCESSED_FILES_RECORD):
with open(PROCESSED_FILES_RECORD, 'r') as file:
return set(file.read().splitlines())
return set()
# Function to append processed file to the record
def record_processed_file(file_key):
with open(PROCESSED_FILES_RECORD, 'a') as file:
file.write(f"{file_key}\n")
# Function to process each file
def process_file(file_key):
obj = bucket.Object(file_key)
response = obj.get()
file_content = response['Body'].read()
# Initialize FjaPb object and parse content
fja_pb = FjaPb()
fja_pb.ParseFromString(file_content)
# Access the required properties
product_count = fja_pb.product_count
rating_group_count = fja_pb.rating_group_count
# Record the file as processed
record_processed_file(file_key)
# Return a tuple of the file key and extracted properties
return (file_key, product_count, rating_group_count)
# Main function to orchestrate the concurrent processing
def main():
processed_files = read_processed_files()
files_to_process = [file.key for file in bucket.objects.filter(Prefix=PREFIX) if file.key.endswith('.bom') and file.key not in processed_files]
results = []
# Set up ThreadPoolExecutor to manage concurrency
with ThreadPoolExecutor(max_workers=50) as executor:
# Submit tasks to the executor
future_to_file = {executor.submit(process_file, file_key): file_key for file_key in files_to_process}
# As each future completes, collect its result
for future in as_completed(future_to_file):
results.append(future.result())
# Write results to CSV file, only if there are results to write
if results:
csv_filename = f"{PREFIX}.csv"
with open(csv_filename, 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['File Key', 'Product Count', 'Rating Group Count'])
for result in results:
writer.writerow(result)
print(f"Data written to {csv_filename}")
if __name__ == "__main__":
main()
@recalde
Copy link
Author

recalde commented Feb 28, 2024

this is great, change the program so that if it is terminated early, and then resumed, that it will not need to reprocess any files.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment