Last active
November 7, 2023 21:53
-
-
Save allancaffee/f43d20828ab6d92e411171ce98a90505 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import itertools | |
import re | |
import sys | |
from base64 import b64encode | |
from hashlib import sha256 | |
import phonenumbers | |
import pytz | |
from csvkit import DictReader, DictWriter | |
from dateutil.parser import parse as parse_date | |
FIXED_ROW_MAPPING = { | |
"web_order_id": "TransactionID", | |
"order_time": "TimeOfSale", | |
"order_total": "Amount", | |
"email": "Email", | |
"phone": "Phone", | |
} | |
PARTNER_TIMEZONE = pytz.timezone("US/Pacific") | |
ADDRESS_WITH_OPTIONAL_TAGS_PATTERN = re.compile("^([^+]*)(?:[+].*)?@(.*)$") | |
def normalize_email(email: str) -> str: | |
cleaned_address = email.strip().lower() | |
if not cleaned_address.endswith("@gmail.com"): | |
return cleaned_address | |
match = ADDRESS_WITH_OPTIONAL_TAGS_PATTERN.match(cleaned_address) | |
if not match: | |
raise ValueError(f"Could not parse email address: {email!r}") | |
localpart, domain = match.groups() | |
localpart = localpart.replace(".", "") | |
return f"{localpart}@{domain}" | |
def normalize_phone_number(phone_number: str) -> str: | |
try: | |
parsed_number = phonenumbers.parse(phone_number, "US") | |
if phonenumbers.is_valid_number(parsed_number): | |
# Format the phone number in E.164 format | |
return phonenumbers.format_number( | |
parsed_number, phonenumbers.PhoneNumberFormat.E164 | |
) | |
else: | |
raise ValueError(f"Could not normalize phone number: {phone_number!r}") | |
except phonenumbers.NumberParseException as ex: | |
raise ValueError(f"Could not parse phone number: {phone_number!r}") | |
def salt_and_hash(contact: str): | |
salted = contact + "+curated" | |
hashed = sha256(salted.encode()) | |
return b64encode(hashed.digest()).decode() | |
def convert_to_utc_timestamp(date_str: str, starting_tz: pytz.timezone) -> str: | |
parsed = parse_date(date_str) | |
return parsed.replace(tzinfo=starting_tz).isoformat() | |
def process_row_group(row_group: list[dict]) -> dict: | |
lead_row = row_group[0] | |
new_row = {} | |
for source_field, target_field in FIXED_ROW_MAPPING.items(): | |
if target_field == "Email": | |
try: | |
new_row[target_field] = salt_and_hash( | |
normalize_email(lead_row[source_field]) | |
) | |
except ValueError as ex: | |
print(f"{ex} leaving field blank.", file=sys.stderr) | |
new_row[target_field] = "" | |
elif target_field == "Phone": | |
try: | |
new_row[target_field] = salt_and_hash( | |
normalize_phone_number(lead_row[source_field]) | |
) | |
except ValueError as ex: | |
print(f"{ex} leaving field blank.", file=sys.stderr) | |
new_row[target_field] = "" | |
elif target_field == "TimeOfSale": | |
new_row[target_field] = convert_to_utc_timestamp( | |
lead_row[source_field], PARTNER_TIMEZONE | |
) | |
else: | |
new_row[target_field] = lead_row[source_field] | |
# Now for every row in the group, we extract the product details. | |
for i, row in enumerate(row_group, 1): | |
new_row[f"Product{i}_Brand"] = row["brand"] | |
# We're mapping the "code" to MPN. It's showing up in the event data as | |
# the GTIN, but it matches the MPN in the product feed. We should update | |
# the event data ASAP so that we can match these up. | |
new_row[f"Product{i}_MPN"] = row["code"] | |
new_row[f"Product{i}_Subtotal"] = row["line subtotal"] | |
new_row[f"Product{i}_Quantity"] = row["items"] | |
new_row["Currency"] = "USD" | |
return new_row | |
def grouped_rows(reader): | |
for _, group in itertools.groupby(reader, lambda row: row["order_id"]): | |
yield list(group) | |
def main(input_file, output_file): | |
reader = DictReader(input_file) | |
all_field_names = [] | |
all_rows = [] | |
for row_group in grouped_rows(reader): | |
try: | |
processed_row = process_row_group(row_group) | |
except KeyError as ex: | |
raise ValueError(f"Input data is missing required field: {ex}") | |
# Grab the longest list of field names. This is the simplest way to get | |
# all of the field names while preserving order. | |
if len(processed_row) > len(all_field_names): | |
all_field_names = processed_row.keys() | |
all_rows.append(processed_row) | |
writer = DictWriter(output_file, all_field_names) | |
writer.writeheader() | |
writer.writerows(all_rows) | |
if __name__ == "__main__": | |
try: | |
with open(sys.argv[1], "r") as input_file: | |
with open(sys.argv[2], "w") as output_file: | |
main(input_file, output_file) | |
except ValueError as ex: | |
print(str(ex), file=sys.stderr) | |
sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment