Last active
March 4, 2024 17:14
-
-
Save recalde/620bc44f7d2a17c82682edab987267bc to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import csv | |
import boto3 | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
from FjaPb import FjaPb # Assuming this is the correct import for FjaPb | |
# Load AWS credentials and target bucket details from environment variables | |
ACCESS_KEY = os.getenv("AWS_ACCESS_KEY_ID") | |
SECRET_KEY = os.getenv("AWS_SECRET_ACCESS_KEY") | |
BUCKET_URL = os.getenv("S3_BUCKET_URL") | |
PREFIX = 'your_prefix_here' # This should be replaced with your actual prefix | |
# Initialize a boto3 session | |
session = boto3.Session( | |
aws_access_key_id=ACCESS_KEY, | |
aws_secret_access_key=SECRET_KEY, | |
) | |
s3 = session.resource('s3') | |
bucket = s3.Bucket(BUCKET_URL) | |
# File to keep track of processed files | |
PROCESSED_FILES_RECORD = f"{PREFIX}_processed_files.txt" | |
# Function to read processed files from record | |
def read_processed_files(): | |
if os.path.exists(PROCESSED_FILES_RECORD): | |
with open(PROCESSED_FILES_RECORD, 'r') as file: | |
return set(file.read().splitlines()) | |
return set() | |
# Function to append processed file to the record | |
def record_processed_file(file_key): | |
with open(PROCESSED_FILES_RECORD, 'a') as file: | |
file.write(f"{file_key}\n") | |
# Function to process each file | |
def process_file(file_key): | |
obj = bucket.Object(file_key) | |
response = obj.get() | |
file_content = response['Body'].read() | |
# Initialize FjaPb object and parse content | |
fja_pb = FjaPb() | |
fja_pb.ParseFromString(file_content) | |
# Access the required properties | |
product_count = fja_pb.product_count | |
rating_group_count = fja_pb.rating_group_count | |
# Record the file as processed | |
record_processed_file(file_key) | |
# Return a tuple of the file key and extracted properties | |
return (file_key, product_count, rating_group_count) | |
# Main function to orchestrate the concurrent processing | |
def main(): | |
processed_files = read_processed_files() | |
files_to_process = [file.key for file in bucket.objects.filter(Prefix=PREFIX) if file.key.endswith('.bom') and file.key not in processed_files] | |
results = [] | |
# Set up ThreadPoolExecutor to manage concurrency | |
with ThreadPoolExecutor(max_workers=50) as executor: | |
# Submit tasks to the executor | |
future_to_file = {executor.submit(process_file, file_key): file_key for file_key in files_to_process} | |
# As each future completes, collect its result | |
for future in as_completed(future_to_file): | |
results.append(future.result()) | |
# Write results to CSV file, only if there are results to write | |
if results: | |
csv_filename = f"{PREFIX}.csv" | |
with open(csv_filename, 'w', newline='') as csvfile: | |
writer = csv.writer(csvfile) | |
writer.writerow(['File Key', 'Product Count', 'Rating Group Count']) | |
for result in results: | |
writer.writerow(result) | |
print(f"Data written to {csv_filename}") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
this is great, change the program so that if it is terminated early, and then resumed, that it will not need to reprocess any files.