Skip to content

Instantly share code, notes, and snippets.

@JeffreyMFarley
Created February 13, 2024 18:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save JeffreyMFarley/68bb0861173b0a4ba339fd660c905b0a to your computer and use it in GitHub Desktop.
Save JeffreyMFarley/68bb0861173b0a4ba339fd660c905b0a to your computer and use it in GitHub Desktop.
This script will compare the headers of all CSV files in a specified S3 bucket
# https://chat.openai.com/c/9d7c6f7d-2dbf-4d29-a2d2-364c6750d476
import boto3
import csv
import pytz
from collections import Counter, OrderedDict
from datetime import datetime
from io import StringIO
BUCKET_NAME = "foo"
START_PATH = "bar"
START_DATE = "2020-01-01"
# -----------------------------------------------------------------------------
# Initialize S3 client
s3_client = boto3.client("s3")
def list_csv_files(bucket_name, start_date_str, start_path):
"""
List all CSV files in the specified S3 bucket that are modified after the start date
and start with the start path.
"""
start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
start_date = pytz.UTC.localize(start_date) # Ensure timezone-aware comparison
csv_files = []
paginator = s3_client.get_paginator("list_objects_v2")
print(f"Listing CSV files in {bucket_name}/{start_path} after {start_date_str}...")
for i, page in enumerate(
paginator.paginate(Bucket=bucket_name, Prefix=start_path).build_full_result(),
start=1,
):
print(f" Page {i}\r", end="", flush=True)
for obj in page.get("Contents", []):
if obj["Key"].endswith(".csv") and obj["LastModified"] >= start_date:
csv_files.append(obj["Key"])
print()
return csv_files
def read_csv_header(s3_client, bucket_name, file_key):
"""Read the header row of a CSV file stored in S3."""
obj = s3_client.get_object(Bucket=bucket_name, Key=file_key, Range="bytes=0-9999")
csv_string = obj["Body"].read().decode("utf-8")
csv_reader = csv.reader(StringIO(csv_string))
header = next(csv_reader)
return header
def compare_csv_headers(bucket_name, start_date_str, start_path):
"""Compare the header of each CSV with the previous one and note changes."""
csv_files = list_csv_files(bucket_name, start_date_str, start_path)
print(f"Found {len(csv_files)} CSV files")
previous_header = None
changed_headers = OrderedDict()
for i, file_key in enumerate(csv_files, start=1):
if i % 10 == 0 or len(csv_files) < 40:
print(
f"{i} / {len(csv_files)}\tChanges: {len(changed_headers)}\r",
end="",
flush=True,
)
current_header = read_csv_header(s3_client, bucket_name, file_key)
if current_header != previous_header:
changed_headers[file_key] = current_header
previous_header = current_header
print()
ordered_headers = list(changed_headers.values())
# Find the common headers
common_header = set(ordered_headers[0])
for header in changed_headers.values():
common_header &= set(header)
# Print the common headers
print("Common headers:")
print(sorted(common_header))
# Show how each changed header differs from the common header
index = 0
print("\nChanged headers:")
for file_key, header in changed_headers.items():
print(file_key)
if len(set(header)) < len(header):
print("\tDuplicate columns")
# find duplicates
duplicates = [item for item, count in Counter(header).items() if count > 1]
print(f"\t\t{duplicates}")
# print("\tDifference from common header:")
# print(f"\t{list(set(header) - common_header)}\n")
if index > 0:
print("\tDifference from previous header:")
print(f"\t{list(set(header) - set(ordered_headers[index - 1]))}\n")
index += 1
# -----------------------------------------------------------------------------
# Main
compare_csv_headers(BUCKET_NAME, START_DATE, START_PATH)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment