Created
February 13, 2024 18:49
-
-
Save JeffreyMFarley/68bb0861173b0a4ba339fd660c905b0a to your computer and use it in GitHub Desktop.
This script will compare the headers of all CSV files in a specified S3 bucket
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# https://chat.openai.com/c/9d7c6f7d-2dbf-4d29-a2d2-364c6750d476 | |
import boto3 | |
import csv | |
import pytz | |
from collections import Counter, OrderedDict | |
from datetime import datetime | |
from io import StringIO | |
BUCKET_NAME = "foo" | |
START_PATH = "bar" | |
START_DATE = "2020-01-01" | |
# ----------------------------------------------------------------------------- | |
# Initialize S3 client | |
s3_client = boto3.client("s3") | |
def list_csv_files(bucket_name, start_date_str, start_path): | |
""" | |
List all CSV files in the specified S3 bucket that are modified after the start date | |
and start with the start path. | |
""" | |
start_date = datetime.strptime(start_date_str, "%Y-%m-%d") | |
start_date = pytz.UTC.localize(start_date) # Ensure timezone-aware comparison | |
csv_files = [] | |
paginator = s3_client.get_paginator("list_objects_v2") | |
print(f"Listing CSV files in {bucket_name}/{start_path} after {start_date_str}...") | |
for i, page in enumerate( | |
paginator.paginate(Bucket=bucket_name, Prefix=start_path).build_full_result(), | |
start=1, | |
): | |
print(f" Page {i}\r", end="", flush=True) | |
for obj in page.get("Contents", []): | |
if obj["Key"].endswith(".csv") and obj["LastModified"] >= start_date: | |
csv_files.append(obj["Key"]) | |
print() | |
return csv_files | |
def read_csv_header(s3_client, bucket_name, file_key): | |
"""Read the header row of a CSV file stored in S3.""" | |
obj = s3_client.get_object(Bucket=bucket_name, Key=file_key, Range="bytes=0-9999") | |
csv_string = obj["Body"].read().decode("utf-8") | |
csv_reader = csv.reader(StringIO(csv_string)) | |
header = next(csv_reader) | |
return header | |
def compare_csv_headers(bucket_name, start_date_str, start_path): | |
"""Compare the header of each CSV with the previous one and note changes.""" | |
csv_files = list_csv_files(bucket_name, start_date_str, start_path) | |
print(f"Found {len(csv_files)} CSV files") | |
previous_header = None | |
changed_headers = OrderedDict() | |
for i, file_key in enumerate(csv_files, start=1): | |
if i % 10 == 0 or len(csv_files) < 40: | |
print( | |
f"{i} / {len(csv_files)}\tChanges: {len(changed_headers)}\r", | |
end="", | |
flush=True, | |
) | |
current_header = read_csv_header(s3_client, bucket_name, file_key) | |
if current_header != previous_header: | |
changed_headers[file_key] = current_header | |
previous_header = current_header | |
print() | |
ordered_headers = list(changed_headers.values()) | |
# Find the common headers | |
common_header = set(ordered_headers[0]) | |
for header in changed_headers.values(): | |
common_header &= set(header) | |
# Print the common headers | |
print("Common headers:") | |
print(sorted(common_header)) | |
# Show how each changed header differs from the common header | |
index = 0 | |
print("\nChanged headers:") | |
for file_key, header in changed_headers.items(): | |
print(file_key) | |
if len(set(header)) < len(header): | |
print("\tDuplicate columns") | |
# find duplicates | |
duplicates = [item for item, count in Counter(header).items() if count > 1] | |
print(f"\t\t{duplicates}") | |
# print("\tDifference from common header:") | |
# print(f"\t{list(set(header) - common_header)}\n") | |
if index > 0: | |
print("\tDifference from previous header:") | |
print(f"\t{list(set(header) - set(ordered_headers[index - 1]))}\n") | |
index += 1 | |
# ----------------------------------------------------------------------------- | |
# Main | |
compare_csv_headers(BUCKET_NAME, START_DATE, START_PATH) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment