Skip to content

Instantly share code, notes, and snippets.

@allancaffee
Last active November 7, 2023 21:53
Show Gist options
  • Save allancaffee/f43d20828ab6d92e411171ce98a90505 to your computer and use it in GitHub Desktop.
Save allancaffee/f43d20828ab6d92e411171ce98a90505 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import itertools
import re
import sys
from base64 import b64encode
from hashlib import sha256
import phonenumbers
import pytz
from csvkit import DictReader, DictWriter
from dateutil.parser import parse as parse_date
FIXED_ROW_MAPPING = {
"web_order_id": "TransactionID",
"order_time": "TimeOfSale",
"order_total": "Amount",
"email": "Email",
"phone": "Phone",
}
PARTNER_TIMEZONE = pytz.timezone("US/Pacific")
ADDRESS_WITH_OPTIONAL_TAGS_PATTERN = re.compile("^([^+]*)(?:[+].*)?@(.*)$")
def normalize_email(email: str) -> str:
cleaned_address = email.strip().lower()
if not cleaned_address.endswith("@gmail.com"):
return cleaned_address
match = ADDRESS_WITH_OPTIONAL_TAGS_PATTERN.match(cleaned_address)
if not match:
raise ValueError(f"Could not parse email address: {email!r}")
localpart, domain = match.groups()
localpart = localpart.replace(".", "")
return f"{localpart}@{domain}"
def normalize_phone_number(phone_number: str) -> str:
try:
parsed_number = phonenumbers.parse(phone_number, "US")
if phonenumbers.is_valid_number(parsed_number):
# Format the phone number in E.164 format
return phonenumbers.format_number(
parsed_number, phonenumbers.PhoneNumberFormat.E164
)
else:
raise ValueError(f"Could not normalize phone number: {phone_number!r}")
except phonenumbers.NumberParseException as ex:
raise ValueError(f"Could not parse phone number: {phone_number!r}")
def salt_and_hash(contact: str):
salted = contact + "+curated"
hashed = sha256(salted.encode())
return b64encode(hashed.digest()).decode()
def convert_to_utc_timestamp(date_str: str, starting_tz: pytz.timezone) -> str:
parsed = parse_date(date_str)
return parsed.replace(tzinfo=starting_tz).isoformat()
def process_row_group(row_group: list[dict]) -> dict:
lead_row = row_group[0]
new_row = {}
for source_field, target_field in FIXED_ROW_MAPPING.items():
if target_field == "Email":
try:
new_row[target_field] = salt_and_hash(
normalize_email(lead_row[source_field])
)
except ValueError as ex:
print(f"{ex} leaving field blank.", file=sys.stderr)
new_row[target_field] = ""
elif target_field == "Phone":
try:
new_row[target_field] = salt_and_hash(
normalize_phone_number(lead_row[source_field])
)
except ValueError as ex:
print(f"{ex} leaving field blank.", file=sys.stderr)
new_row[target_field] = ""
elif target_field == "TimeOfSale":
new_row[target_field] = convert_to_utc_timestamp(
lead_row[source_field], PARTNER_TIMEZONE
)
else:
new_row[target_field] = lead_row[source_field]
# Now for every row in the group, we extract the product details.
for i, row in enumerate(row_group, 1):
new_row[f"Product{i}_Brand"] = row["brand"]
# We're mapping the "code" to MPN. It's showing up in the event data as
# the GTIN, but it matches the MPN in the product feed. We should update
# the event data ASAP so that we can match these up.
new_row[f"Product{i}_MPN"] = row["code"]
new_row[f"Product{i}_Subtotal"] = row["line subtotal"]
new_row[f"Product{i}_Quantity"] = row["items"]
new_row["Currency"] = "USD"
return new_row
def grouped_rows(reader):
for _, group in itertools.groupby(reader, lambda row: row["order_id"]):
yield list(group)
def main(input_file, output_file):
reader = DictReader(input_file)
all_field_names = []
all_rows = []
for row_group in grouped_rows(reader):
try:
processed_row = process_row_group(row_group)
except KeyError as ex:
raise ValueError(f"Input data is missing required field: {ex}")
# Grab the longest list of field names. This is the simplest way to get
# all of the field names while preserving order.
if len(processed_row) > len(all_field_names):
all_field_names = processed_row.keys()
all_rows.append(processed_row)
writer = DictWriter(output_file, all_field_names)
writer.writeheader()
writer.writerows(all_rows)
if __name__ == "__main__":
try:
with open(sys.argv[1], "r") as input_file:
with open(sys.argv[2], "w") as output_file:
main(input_file, output_file)
except ValueError as ex:
print(str(ex), file=sys.stderr)
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment