Skip to content

Instantly share code, notes, and snippets.

@octavian-zarzu
Created May 9, 2023 20:45
Show Gist options
  • Save octavian-zarzu/03fc3285265d6f1fa1af87586b9a6e82 to your computer and use it in GitHub Desktop.
Save octavian-zarzu/03fc3285265d6f1fa1af87586b9a6e82 to your computer and use it in GitHub Desktop.
testrandom
import collections
import csv
import dataclasses
import datetime
import io
import itertools
import logging
import os
import json
import random
import typing
import uuid
import sys
# import time
import boto3
import faker
import numpy as np
fake = faker.Faker()
# final datasets
orders = list()
users = list()
order_items = list()
events = list()
inventory_items = list()
# s3 client
s3_client = boto3.client("s3")
# kinesis client
kinesis_client = boto3.client("kinesis")
# source bucket info
source_aws_bucket=os.environ["SOURCE_AWS_BUCKET"]
source_aws_prefix=os.environ["SOURCE_AWS_PREFIX"]
# read from local csv and return products
def generate_products() -> typing.List[dict]:
product_brand_dict = {} # products partitioned by brand - unused
product_category_dict = {} # product partitioned by cateogry - unused
gender_category_dict = {} # products partitioned by gender and category - unused
product_id_dict = {} # products to generate events table - unused
product_gender_dict = {} # product partitioned by gender
product_by_id_dict = {} # products partitioned by product ID
products = collections.defaultdict(list)
key = source_aws_prefix + '/products.csv'
response = s3_client.get_object(Bucket=source_aws_bucket, Key=key)
data = response["Body"].read().decode('utf-8')
csv_reader = csv.DictReader(io.StringIO(data))
for row in csv_reader:
for k, v in row.items():
products[k].append(v)
product_id = products["id"]
brands = products["brand"]
name = products["name"]
cost = products["cost"]
category = products["category"]
department = products["department"]
sku = products["sku"]
retail_price = products["retail_price"]
distribution_center_id = products["distribution_center_id"]
for _ in range(len(brands)):
product_brand_dict[brands[_]] = []
product_category_dict[category[_]] = []
product_id_dict[product_id[_]] = []
product_by_id_dict[product_id[_]] = []
if department[_] == "Men":
product_gender_dict["M"] = []
gender_category_dict["M" + category[_]] = []
if department[_] == "Women":
product_gender_dict["F"] = []
gender_category_dict["F" + category[_]] = []
for col in list(
zip(
product_id,
brands,
name,
cost,
category,
department,
sku,
retail_price,
distribution_center_id,
)
):
product_id = col[0]
brand = col[1]
name = col[2]
cost = col[3]
category = col[4]
department = col[5]
sku = col[6]
retail_price = col[7]
distribution_center_id = col[8]
product_by_id_dict[product_id] = {
"brand": brand,
"name": name,
"cost": cost,
"category": category,
"department": department,
"sku": sku,
"retail_price": retail_price,
"distribution_center_id": distribution_center_id,
}
product_brand_dict[brand].append(col)
product_category_dict[category].append(col)
if department == "Men":
product_gender_dict["M"].append(col)
gender_category_dict["M" + category].append(col)
if department == "Women":
product_gender_dict["F"].append(col)
gender_category_dict["F" + category].append(col)
# helper dict to generate events
for col in list(zip(product_id, brands, category, department)):
product_id_dict[col[0]] = {
"brand": col[1],
"category": col[2],
"department": col[3],
}
return product_gender_dict, product_by_id_dict, products
# read from local csv and return locations
def generate_locations() -> typing.List[str]:
key = source_aws_prefix + '/world_pop.csv'
response = s3_client.get_object(Bucket=source_aws_bucket, Key=key)
data = response["Body"].read().decode('utf-8')
csv_reader = csv.DictReader(io.StringIO(data))
location_data = [row for row in csv_reader]
return location_data
SECONDS_IN_MINUTE = 60
MINUTES_IN_HOUR = 60
MINUTES_IN_DAY = 1440
MIN_AGE = 12
MAX_AGE = 71
products = generate_products()
logging.info("generating products helper dict")
logging.info("generating locations data")
LOCATION_DATA = generate_locations()
PRODUCT_GENDER_DICT = products[0]
PRODUCT_BY_ID_DICT = products[1]
def main(
num_of_users: int,
num_of_ghost_events: int,
extraneous_headers: typing.List[str],
) -> None:
# read and generate location
logging.info("generating data")
for user_num in range(int(num_of_users)):
if user_num % 10000 == 0:
logging.info(f"user transaction {user_num}")
users.append(dataclasses.asdict(Users()))
# remove extraneous keys in order_items
logging.info("remove extraneous keys from order items")
for oi in order_items:
for key in extraneous_headers:
del oi[key]
# generate ghost events
logging.info("generating ghost events")
for user_num in range(int(num_of_users) * int(num_of_ghost_events)):
logging.info(f"ghost event {user_num}")
GhostEvents()
# write generated data to Kinesis Streams
table_dat = [users, orders, order_items, events, inventory_items]
table_name = ["users", "orders", "order_items", "events", "inventory_items"]
stream_names = ["data-stream-users", "data-stream-orders", "data-stream-order-items", "data-stream-events", "data-stream-inventory-items"]
for name, table_dat, stream_name in list(zip(table_name, table_dat, stream_names)):
logging.info(f"converting {name} dict to csv")
csv_data = dict_to_csv(table_dat)
partition_key = str(uuid.uuid4())
logging.info(
f"sending output file {name} to stream... {stream_name}"
)
send_to_kinesis_stream(
stream_name=stream_name,
csv_data=csv_data,
partition_key=partition_key
)
# upload static data to AWS
# file_names = ["products.csv", "distribution_centers.csv"]
# for file in file_names:
# logging.info(
# f"uploading output file to... s3://{target_aws_bucket}/{target_aws_prefix}/{file}"
# )
# upload_to_bucket(
# target_bucket=target_aws_bucket,
# target_prefix=target_aws_prefix,
# target_object=f"{file}",
# source_filepath=f"{source_dir}/{file}",
# )
# returns random address based off specified distribution
def get_address(
*, country: str = "*", state: str = "*", postal_code: str = "*"
) -> dict:
# country = '*' OR country = 'USA' OR country={'USA':.75,'UK':.25}
# state = '*' OR state = 'California' OR state={'California':.75,'New York':.25}
# postal_code = '*' OR postal_code = '95060' OR postal_code={'94117':.75,'95060':.25}
# type checking is used to provide flexibility of inputs to function (ie. can be dict with proportions, or could be single string value)
universe = []
if postal_code != "*":
if type(postal_code) == str:
universe += list(
filter(lambda row: row["postal_code"] == postal_code, LOCATION_DATA)
)
elif type(postal_code) == dict:
universe += list(
filter(
lambda row: row["postal_code"] in postal_code.keys(), LOCATION_DATA
)
)
if state != "*":
if type(state) == str:
universe += list(filter(lambda row: row["state"] == state, LOCATION_DATA))
elif type(state) == dict:
universe += list(
filter(lambda row: row["state"] in state.keys(), LOCATION_DATA)
)
if country != "*":
if type(country) == str:
universe += list(
filter(lambda row: row["country"] == country, LOCATION_DATA)
)
elif type(country) == dict:
universe += list(
filter(lambda row: row["country"] in country.keys(), LOCATION_DATA)
)
if len(universe) == 0:
universe = LOCATION_DATA
total_pop = sum([int(loc["population"]) for loc in universe])
for loc in universe:
loc["population"] = int(loc["population"])
if type(postal_code) == dict:
if loc["postal_code"] in postal_code.keys():
loc["population"] = postal_code[loc["postal_code"]] * total_pop
if type(state) == dict:
if loc["state"] in state.keys():
loc["population"] = (
state[loc["state"]]
* (
loc["population"]
/ sum(
[
loc2["population"]
for loc2 in universe
if loc["state"] == loc2["state"]
]
)
)
* total_pop
)
if type(country) == dict:
if loc["country"] in country.keys():
loc["population"] = (
country[loc["country"]]
* (
loc["population"]
/ sum(
[
loc2["population"]
for loc2 in universe
if loc["country"] == loc2["country"]
]
)
)
* total_pop
)
loc = random.choices(
universe, weights=[loc["population"] / total_pop for loc in universe]
)[0]
return {
"street": fake.street_address(),
"city": loc["city"],
"state": loc["state"],
"postal_code": loc["postal_code"],
"country": loc["country"],
"latitude": loc["latitude"],
"longitude": loc["longitude"],
}
# generates random date between now and specified date
def created_at(start_date: datetime.datetime) -> datetime.datetime:
end_date = datetime.datetime.now()
time_between_dates = end_date - start_date
days_between_dates = time_between_dates.days
if days_between_dates <= 1:
days_between_dates = 2
random_number_of_days = random.randrange(1, days_between_dates)
created_at = (
start_date
+ datetime.timedelta(days=random_number_of_days)
+ datetime.timedelta(minutes=random.randrange(MINUTES_IN_HOUR * 19))
)
return created_at
# generate URI for events table
def generate_uri(event: str, product: str) -> str:
if event == "product":
return f"/{event}/{product[0]}"
elif event == "department":
return f"""/{event}/{product[5].lower()}/category/{product[4].lower().replace(" ", "")}/brand/{product[1].lower().replace(" ", "")}"""
else:
return f"/{event}"
# # converts list of dicts into csv format
# def dict_to_csv(data: dict) -> str:
# output = io.StringIO()
# writer = csv.writer(output, quoting=csv.QUOTE_NONNUMERIC)
# header_writer = csv.DictWriter(output, fieldnames=data[0].keys())
# header_writer.writeheader()
# for dat in data:
# writer.writerow(dat.values())
# return output.getvalue()
def dict_to_csv(data: list) -> str:
output = io.StringIO()
writer = csv.DictWriter(output, fieldnames=data[0].keys(), quoting=csv.QUOTE_NONNUMERIC)
writer.writeheader()
writer.writerows(data)
csv_file = output.getvalue()
logging.info('Data size: ' + str(len(csv_file.encode('utf-8'))))
output.close()
return csv_file
def send_to_kinesis_stream(stream_name: str, csv_data: str, partition_key: str) -> None:
response = kinesis_client.put_record(
StreamName=stream_name,
Data=csv_data,
PartitionKey=partition_key
)
print(response)
# def upload_to_bucket(
# target_bucket: str,
# target_prefix: str,
# target_object: str,
# source_data: list = None,
# source_filepath: str = None,
# ) -> str:
# s3_path = f"{target_prefix}/{target_object}"
# content_type = "text/csv"
# if source_data:
# s3_client.put_object(Body=str(source_data), Bucket=target_bucket, Key=s3_path, ContentType=content_type)
# else:
# s3_client.upload_file(source_filepath, target_bucket, s3_path, ExtraArgs={'ContentType': content_type})
# return f"https://{target_bucket}.s3.amazonaws.com/{s3_path}"
# upload_local function for testing locally
# def upload_local(
# target_bucket: str,
# target_prefix: str,
# target_object: str,
# source_data: list = None,
# source_filepath: str = None,
# ) -> str:
# # Create target directory if it doesn't exist
# full_directory = os.path.join(target_bucket, target_prefix)
# os.makedirs(full_directory, exist_ok=True)
# # Create the target file path
# target_filepath = os.path.join(full_directory, target_object)
# print(source_data)
# if source_data:
# # Save source data to the target file
# with open(target_filepath, 'w', encoding='utf-8') as f:
# f.write(str(source_data))
# else:
# # Copy the source file to the target file
# import shutil
# shutil.copyfile(source_filepath, target_filepath)
# return target_filepath
# utility class
class DataUtil:
def child_created_at(
self, probability: str = "uniform"
) -> datetime.datetime: # returns a random timestamp between now and parent date
time_between_dates = datetime.datetime.now() - self.parent.created_at
days_between_dates = time_between_dates.days
if days_between_dates <= 1:
days_between_dates = 2
random_number_of_days = random.randrange(
1, days_between_dates
) # generates random day between now and when user initially got created
created_at = self.parent.created_at + datetime.timedelta(
days=random_number_of_days
)
return created_at
def random_item(
self, population, **distribution
) -> str: # returns single random item from a list based off distribution
if distribution:
return random.choices(
population=population, weights=distribution["distribution"]
)[0]
else:
return random.choices(population=population)[0]
@dataclasses.dataclass
class Address(object):
def __init__(self, data: dict):
self.street = data["street"]
self.city = data["city"]
self.state = data["state"]
self.postal_code = data["postal_code"]
self.country = data["country"]
self.latitude = data["latitude"]
self.longitude = data["longitude"]
def __str__(self):
return f"{self.street} \n{self.city}, {self.state} \n{self.postal_code} \n{self.country} \n{self.latitude} \n{self.longitude}"
@dataclasses.dataclass
class Users(DataUtil):
logging.info("generating user")
id: int = dataclasses.field(default_factory=itertools.count(start=1).__next__)
first_name: str = dataclasses.field(init=False)
last_name: str = dataclasses.field(init=False)
email: str = dataclasses.field(init=False)
age: int = dataclasses.field(init=False)
gender: str = dataclasses.field(init=False)
state: str = dataclasses.field(init=False)
street_address: str = dataclasses.field(init=False)
postal_code: str = dataclasses.field(init=False)
city: str = dataclasses.field(init=False)
country: str = dataclasses.field(init=False)
latitude: float = dataclasses.field(init=False)
longitude: float = dataclasses.field(init=False)
traffic_source: str = dataclasses.field(init=False)
created_at: datetime.datetime = dataclasses.field(init=False)
def __post_init__(self):
self.gender = self.random_item(population=["M", "F"]) # uniform distribution
if self.gender == "M":
self.first_name = fake.first_name_male()
self.traffic_source = self.random_item(
population=["Organic", "Facebook", "Search", "Email", "Display"],
distribution=[0.15, 0.06, 0.7, 0.05, 0.04],
)
if self.gender == "F":
self.first_name = fake.first_name_female()
self.traffic_source = self.random_item(
population=["Organic", "Facebook", "Search", "Email", "Display"],
distribution=[0.15, 0.06, 0.7, 0.05, 0.04],
)
self.last_name = fake.last_name_nonbinary()
address = Address(get_address())
self.state = address.state
self.street_address = address.street
self.postal_code = address.postal_code
self.city = address.city
self.country = address.country
self.latitude = address.latitude
self.longitude = address.longitude
self.email = f"{self.first_name.lower()}{self.last_name.lower()}@{fake.safe_domain_name()}"
self.age = random.randrange(MIN_AGE, MAX_AGE)
# weight newer users/orders
choice = random.choices([0, 1], weights=[0.975, 0.025])[0]
if choice == 0:
self.created_at = created_at(datetime.datetime(2019, 1, 1))
if choice == 1:
self.created_at = created_at(
datetime.datetime.now() - datetime.timedelta(days=7)
)
num_of_orders = random.choices(
population=[0, 1, 2, 3, 4], weights=[0.2, 0.5, 0.2, 0.05, 0.05]
)[0]
if num_of_orders == 0:
pass
else:
for _ in range(num_of_orders):
orders.append(dataclasses.asdict(Order(user=self)))
def __str__(self):
return f"{self.id}, {self.first_name}, {self.last_name}, {self.email}, {self.age}, {self.gender}, {self.state}, {self.street_address}, {self.postal_code}, {self.city}, {self.traffic_source}, {self.created_at}"
@dataclasses.dataclass
class Product:
logging.info("generating product")
product_id: int = dataclasses.field(init=False)
brand: str = dataclasses.field(init=False)
name: str = dataclasses.field(init=False)
cost: float = dataclasses.field(init=False)
category: str = dataclasses.field(init=False)
department: str = dataclasses.field(init=False)
sku: str = dataclasses.field(init=False)
retail_price: float = dataclasses.field(init=False)
distribution_center_id: int = dataclasses.field(init=False)
def __post_init__(self):
person = Users()
random_idx = np.random.choice(
a=len(PRODUCT_GENDER_DICT[person.gender]), size=1
)[0]
product = PRODUCT_GENDER_DICT[person.gender][random_idx]
self.brand = product[0]
self.name = product[1]
self.cost = product[2]
self.category = product[3]
self.department = product[4]
self.sku = product[5]
self.retail_price = product[6]
self.distribution_center_id = product[7]
def __str__(self):
return f"{self.brand}, {self.name}, {self.cost}, {self.category}, {self.department}, {self.sku}, {self.retail_price}, {self.distribution_center_id}"
@dataclasses.dataclass
class Order(DataUtil):
logging.info("generating order")
order_id: int = dataclasses.field(default_factory=itertools.count(start=1).__next__)
user_id: int = dataclasses.field(init=False)
status: str = dataclasses.field(init=False)
gender: str = dataclasses.field(init=False)
created_at: datetime.datetime = dataclasses.field(init=False)
returned_at: datetime.datetime = dataclasses.field(init=False)
shipped_at: datetime.datetime = dataclasses.field(init=False)
delivered_at: datetime.datetime = dataclasses.field(init=False)
num_of_item: int = dataclasses.field(init=False)
user: dataclasses.InitVar[typing.Any] = None
def __post_init__(self, user=None):
self.parent = user
self.user_id = user.id
self.gender = user.gender
self.status = self.random_item(
population=["Complete", "Cancelled", "Returned", "Processing", "Shipped"],
distribution=[0.25, 0.15, 0.1, 0.2, 0.3],
)
self.created_at = self.child_created_at()
# add random generator for days it takes to ship, deliver, return etc.
if self.status == "Returned":
self.shipped_at = self.created_at + datetime.timedelta(
minutes=random.randrange(MINUTES_IN_DAY * 3)
) # shipped between 0-3 days after order placed
self.delivered_at = self.shipped_at + datetime.timedelta(
minutes=random.randrange(MINUTES_IN_DAY * 5)
) # delivered between 0-5 days after ship date
self.returned_at = self.delivered_at + datetime.timedelta(
minutes=random.randrange(MINUTES_IN_DAY * 3)
) # returned 0-3 days after order is delivered
elif self.status == "Complete":
self.shipped_at = self.created_at + datetime.timedelta(
minutes=random.randrange(MINUTES_IN_DAY * 3)
) # shipped between 0-3 days after order placed
self.delivered_at = self.shipped_at + datetime.timedelta(
minutes=random.randrange(MINUTES_IN_DAY * 5)
) # delivered between 0-5 days after ship date
self.returned_at = None
elif self.status == "Shipped":
self.shipped_at = self.created_at + datetime.timedelta(
minutes=random.randrange(MINUTES_IN_DAY * 3)
) # shipped between 0-3 days after order placed
self.delivered_at = None
self.returned_at = None
else:
self.shipped_at = None
self.delivered_at = None
self.returned_at = None
self.user = user # pass person object to order_items
# randomly generate number of items in an order
num_of_items = self.random_item(
population=[1, 2, 3, 4], distribution=[0.7, 0.2, 0.05, 0.05]
)
self.num_of_item = num_of_items
for _ in range(num_of_items):
order_items.append(dataclasses.asdict(OrderItem(order=self)))
def __str__(self):
return f"{self.order_id}, {self.user_id}, {self.status}, {self.created_at}, {self.shipped_at}, {self.delivered_at}, {self.returned_at}"
@dataclasses.dataclass
class Events:
logging.info("generating event")
id: int = dataclasses.field(default_factory=itertools.count(start=1).__next__)
user_id: int = dataclasses.field(init=False)
sequence_number: int = dataclasses.field(init=False)
session_id: str = dataclasses.field(init=False)
created_at: datetime.datetime = dataclasses.field(init=False)
# inventory_item_id:int = field(init=False)
ip_address: str = dataclasses.field(init=False)
city: str = dataclasses.field(init=False)
state: str = dataclasses.field(init=False)
postal_code: str = dataclasses.field(init=False)
browser: str = dataclasses.field(init=False)
traffic_source: str = dataclasses.field(init=False)
uri: str = dataclasses.field(init=False)
event_type: str = dataclasses.field(init=False)
order_item: dataclasses.InitVar[typing.Any] = None
def __post_init__(self, order_item=None):
self.sequence_number = order_item.sequence_number
self.user_id = order_item.user_id
self.created_at = order_item.created_at
self.session_id = order_item.session_id
self.ip_address = order_item.ip_address
self.city = order_item.person.city
self.state = order_item.person.state
self.postal_code = order_item.person.postal_code
self.event_type = order_item.event_type
self.browser = order_item.browser
self.uri = order_item.uri
self.traffic_source = order_item.traffic_source
def __str__(self):
return f"{self.created_at}, {self.ip_address}, {self.city}, {self.state}, {self.postal_code}"
inv_item_id = 0
@dataclasses.dataclass
class OrderItem(DataUtil):
logging.info("generating order item")
id: int = dataclasses.field(default_factory=itertools.count(start=1).__next__)
order_id: int = dataclasses.field(init=False)
user_id: int = dataclasses.field(init=False)
product_id: int = dataclasses.field(init=False)
inventory_item_id: int = dataclasses.field(init=False)
status: str = dataclasses.field(init=False)
created_at: datetime.datetime = dataclasses.field(init=False)
shipped_at: datetime.datetime = dataclasses.field(init=False)
delivered_at: datetime.datetime = dataclasses.field(init=False)
returned_at: datetime.datetime = dataclasses.field(init=False)
sale_price: float = dataclasses.field(init=False)
# extras
event_type: str = dataclasses.field(init=False)
ip_address: str = dataclasses.field(init=False)
browser: str = dataclasses.field(init=False)
traffic_source: str = dataclasses.field(init=False)
session_id: str = dataclasses.field(init=False)
sequence_number: int = dataclasses.field(init=False)
uri: str = dataclasses.field(init=False)
is_sold: bool = dataclasses.field(init=False)
order: dataclasses.InitVar[typing.Any] = None
def __post_init__(self, order=None):
global inv_item_id
self.order_id = order.order_id
self.user_id = order.user_id
inv_item_id = inv_item_id + 1
self.inventory_item_id = inv_item_id
self.status = order.status
self.created_at = order.created_at - datetime.timedelta(
seconds=random.randrange(SECONDS_IN_MINUTE * 240)
) # order purchased within 4 hours
self.shipped_at = order.shipped_at
self.delivered_at = order.delivered_at
self.returned_at = order.returned_at
random_idx = np.random.choice(a=len(PRODUCT_GENDER_DICT[order.gender]), size=1)[
0
]
product = PRODUCT_GENDER_DICT[order.gender][random_idx]
self.product_id = product[0]
self.sale_price = product[7]
self.ip_address = fake.ipv4()
self.browser = self.random_item(
population=["IE", "Chrome", "Safari", "Firefox", "Other"],
distribution=[0.05, 0.5, 0.2, 0.2, 0.05],
)
self.traffic_source = self.random_item(
population=["Email", "Adwords", "Organic", "YouTube", "Facebook"],
distribution=[0.45, 0.3, 0.05, 0.1, 0.1],
)
self.session_id = str(uuid.uuid4())
self.person = order.user # pass person object to events
self.is_sold = True
previous_created_at = None
# Generate Events Table
if order.num_of_item == 1: # if only 1 item in order go through flow
for idx, val in enumerate(
["home", "department", "product", "cart", "purchase"]
):
self.sequence_number = idx + 1
self.event_type = val
self.uri = generate_uri(val, product)
events.append(dataclasses.asdict(Events(order_item=self)))
previous_created_at = self.created_at
self.created_at = previous_created_at + datetime.timedelta(
seconds=random.randrange(SECONDS_IN_MINUTE * 3)
)
else: # if multiple items
sequence_num = 0 # track sequence num of purchase event
for _ in range(order.num_of_item):
for event in ["department", "product", "cart"]:
sequence_num += 1
self.sequence_number = sequence_num
self.event_type = event
self.uri = generate_uri(event, product)
events.append(dataclasses.asdict(Events(order_item=self)))
sequence_num = self.sequence_number
previous_created_at = self.created_at
self.created_at = previous_created_at + datetime.timedelta(
seconds=random.randrange(180)
)
self.sequence_number = sequence_num + 1
self.created_at += datetime.timedelta(random.randrange(5))
self.event_type = "purchase"
self.uri = generate_uri("purchase", product)
events.append(dataclasses.asdict(Events(order_item=self)))
# sold inventory item
inventory_items.append(dataclasses.asdict(InventoryItem(order_item=self)))
# unsold inventory items
num_of_items = self.random_item(
population=[1, 2, 3], distribution=[0.5, 0.3, 0.2]
)
for _ in range(num_of_items):
self.is_sold = False
inv_item_id += 1
self.inventory_item_id = inv_item_id
inventory_items.append(dataclasses.asdict(InventoryItem(order_item=self)))
@dataclasses.dataclass
class InventoryItem:
id: int = dataclasses.field(init=False)
product_id: int = dataclasses.field(init=False)
created_at: datetime.datetime = dataclasses.field(init=False)
sold_at: datetime.datetime = dataclasses.field(init=False)
cost: float = dataclasses.field(init=False)
product_category: str = dataclasses.field(init=False)
product_name: str = dataclasses.field(init=False)
product_brand: str = dataclasses.field(init=False)
product_retail_price: float = dataclasses.field(init=False)
product_department: str = dataclasses.field(init=False)
product_sku: str = dataclasses.field(init=False)
product_distribution_center_id: int = dataclasses.field(init=False)
order_item: dataclasses.InitVar[typing.Any] = None
def __post_init__(self, order_item=None):
self.id = order_item.inventory_item_id
self.product_id = order_item.product_id
if order_item.is_sold is True:
self.created_at = order_item.created_at - datetime.timedelta(
minutes=random.randrange(86400)
) # in inventory between 0 and 60 days
self.sold_at = (
order_item.created_at
) # sold on the date/time the order_items was logged
if order_item.is_sold is False:
self.created_at = created_at(datetime.datetime(2020, 1, 1))
self.sold_at = None
self.cost = PRODUCT_BY_ID_DICT[self.product_id]["cost"]
self.product_category = PRODUCT_BY_ID_DICT[self.product_id]["category"]
self.product_name = PRODUCT_BY_ID_DICT[self.product_id]["name"]
self.product_brand = PRODUCT_BY_ID_DICT[self.product_id]["brand"]
self.product_retail_price = PRODUCT_BY_ID_DICT[self.product_id]["retail_price"]
self.product_department = PRODUCT_BY_ID_DICT[self.product_id]["department"]
self.product_sku = PRODUCT_BY_ID_DICT[self.product_id]["sku"]
self.product_distribution_center_id = PRODUCT_BY_ID_DICT[self.product_id][
"distribution_center_id"
]
def __str__(self):
return f"{self.id}, {self.product_id}, {self.created_at}, {self.cost}, {self.product_category}, {self.product_name}, {self.product_brand}, {self.product_retail_price}, {self.product_department}, {self.product_sku}, {self.product_distribution_center_id}"
@dataclasses.dataclass
class GhostEvents(DataUtil):
id: int = dataclasses.field(init=False)
user_id: int = dataclasses.field(init=False)
sequence_number: int = dataclasses.field(init=False)
session_id: str = dataclasses.field(init=False)
created_at: datetime.datetime = dataclasses.field(init=False)
ip_address: str = dataclasses.field(init=False)
city: str = dataclasses.field(init=False)
state: str = dataclasses.field(init=False)
postal_code: str = dataclasses.field(init=False)
browser: str = dataclasses.field(init=False)
traffic_source: str = dataclasses.field(init=False)
uri: str = dataclasses.field(init=False)
event_type: str = dataclasses.field(init=False)
def __post_init__(self):
address = get_address()
self.sequence_number = 0
self.user_id = None
self.created_at = created_at(datetime.datetime(2019, 1, 1))
self.session_id = str(uuid.uuid4())
self.ip_address = fake.ipv4()
self.city = address["city"]
self.state = address["state"]
self.postal_code = address["postal_code"]
self.browser = self.random_item(
population=["IE", "Chrome", "Safari", "Firefox", "Other"],
distribution=[0.05, 0.5, 0.2, 0.2, 0.05],
)
self.traffic_source = self.random_item(
population=["Email", "Adwords", "Organic", "YouTube", "Facebook"],
distribution=[0.45, 0.3, 0.05, 0.1, 0.1],
)
products = PRODUCT_GENDER_DICT[
self.random_item(population=["M", "F"], distribution=[0.5, 0.5])
]
product = self.random_item(products)
# different event type combinations
cancelled_browsing = ["product", "cart", "cancel"]
abandoned_cart = ["department", "product", "cart"]
viewed_product = ["product"]
viewed_department = ["department", "product"]
random_events = self.random_item(
population=[
cancelled_browsing,
abandoned_cart,
viewed_product,
viewed_department,
]
)
for event in random_events:
# set ghost events ID to max of original
event_id = len(events)
self.id = event_id + 1
event_id = self.id
self.event_type = event
self.uri = generate_uri(event, product)
self.sequence_number += 1
self.created_at = self.created_at + datetime.timedelta(
minutes=random.randrange(int(MINUTES_IN_HOUR * 0.5))
)
events.append(dataclasses.asdict(self))
def __str__(self):
return f"{self.created_at}, {self.ip_address}, {self.city}, {self.state}, {self.postal_code}"
# if __name__ == "__main__":
def lambda_handler(event, context):
logging.getLogger().setLevel(logging.INFO)
logging.info("generating products helper dict")
main(
num_of_users=int(os.environ["NUM_OF_USERS"]),
num_of_ghost_events=int(os.environ["NUM_OF_GHOST_EVENTS"]),
extraneous_headers=json.loads(os.environ["EXTRANEOUS_HEADERS"])
)
logging.info('Number of users: ' + os.environ["NUM_OF_USERS"])
logging.info('Number of ghost events: ' + os.environ["NUM_OF_GHOST_EVENTS"])
logging.info('Source bucket + prefix: ' + os.environ["SOURCE_AWS_BUCKET"] + '/' + os.environ["SOURCE_AWS_PREFIX"])
logging.info('Extraneous headers: ' + os.environ["EXTRANEOUS_HEADERS"])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment