Skip to content

Instantly share code, notes, and snippets.

@rnyak
Last active September 5, 2023 13:40
Show Gist options
  • Save rnyak/d70822084c26ba6972615512e8a78bb2 to your computer and use it in GitHub Desktop.
Save rnyak/d70822084c26ba6972615512e8a78bb2 to your computer and use it in GitHub Desktop.
import os
os.environ["TF_GPU_ALLOCATOR"]="cuda_malloc_async"
import glob
import numpy as np
import pandas as pd
import gc
import calendar
import datetime
import cudf
import cupy
import nvtabular as nvt
from merlin.dag import ColumnSelector
from merlin.schema import Schema, Tags
DATA_FOLDER = os.environ.get("DATA_FOLDER", "/workspace/data")
def generate_synthetic_data(
start_date: datetime.date, end_date: datetime.date, rows_per_day: int = 10000
) -> pd.DataFrame:
assert end_date > start_date, "end_date must be later than start_date"
number_of_days = (end_date - start_date).days
total_number_of_rows = number_of_days * rows_per_day
# Generate a long-tail distribution of item interactions. This simulates that some items are
# more popular than others.
long_tailed_item_distribution = np.clip(
np.random.lognormal(3.0, 10.0, total_number_of_rows).astype(np.int64), 1, 50000
)
# generate random item interaction features
df = pd.DataFrame(
{
"session_id": np.random.randint(10000, 80000, total_number_of_rows),
"item_id": long_tailed_item_distribution,
},
)
# generate category mapping for each item-id
df["category"] = pd.cut(df["item_id"], bins=334, labels=np.arange(1, 335)).astype(
np.int64
)
max_session_length = 60 * 60 # 1 hour
def add_timestamp_to_session(session: pd.DataFrame):
random_start_date_and_time = calendar.timegm(
(
start_date
# Add day offset from start_date
+ datetime.timedelta(days=np.random.randint(0, number_of_days))
# Add time offset within the random day
+ datetime.timedelta(seconds=np.random.randint(0, 86_400))
).timetuple()
)
session["timestamp"] = random_start_date_and_time + np.clip(
np.random.lognormal(3.0, 1.0, len(session)).astype(np.int64),
0,
max_session_length,
)
return session
df = df.groupby("session_id").apply(add_timestamp_to_session).reset_index()
return df
START_DATE = os.environ.get("START_DATE", "2022/4/1")
END_DATE = os.environ.get("END_DATE", "2022/4/5")
interactions_df = generate_synthetic_data(datetime.datetime.strptime(START_DATE, '%Y/%m/%d'),
datetime.datetime.strptime(END_DATE, '%Y/%m/%d'))
interactions_df = cudf.from_pandas(interactions_df)
print("Count with in-session repeated interactions: {}".format(len(interactions_df)))
# Sorts the dataframe by session and timestamp, to remove consecutive repetitions
interactions_df.timestamp = interactions_df.timestamp.astype(int)
interactions_df = interactions_df.sort_values(['session_id', 'timestamp'])
past_ids = interactions_df['item_id'].shift(1).fillna()
session_past_ids = interactions_df['session_id'].shift(1).fillna()
# Keeping only no consecutive repeated in session interactions
interactions_df = interactions_df[~((interactions_df['session_id'] == session_past_ids) & (interactions_df['item_id'] == past_ids))]
print("Count after removed in-session repeated interactions: {}".format(len(interactions_df)))
items_first_ts_df = interactions_df.groupby('item_id').agg({'timestamp': 'min'}).reset_index().rename(columns={'timestamp': 'itemid_ts_first'})
interactions_merged_df = interactions_df.merge(items_first_ts_df, on=['item_id'], how='left')
# free gpu memory
del interactions_df, session_past_ids, items_first_ts_df
gc.collect()
# create time features
session_ts = ColumnSelector(['timestamp'])
session_time = (
session_ts >>
nvt.ops.LambdaOp(lambda col: cudf.to_datetime(col, unit='s')) >>
nvt.ops.Rename(name = 'event_time_dt')
)
dayofweek = (
session_time >>
nvt.ops.LambdaOp(lambda col: col.dt.weekday) >>
nvt.ops.Rename(name ='dayofweek')
)
# Encodes categorical features as contiguous integers
cat_feats = ColumnSelector(['category', 'item_id']) + dayofweek >> nvt.ops.Categorify()
features = ColumnSelector(['session_id', 'timestamp']) + cat_feats + session_time
# Define Groupby Operator
groupby_features = features >> nvt.ops.Groupby(
groupby_cols=["session_id"],
sort_cols=["timestamp"],
aggs={
'item_id': ["list", "count"],
'category': ["list"],
'timestamp': ["first"],
'event_time_dt': ["first"],
'dayofweek': ["first"],
},
name_sep="-")
# Truncate sequence features to first interacted 20 items
SESSIONS_MAX_LENGTH = 20
item_feat = groupby_features['item_id-list'] >> nvt.ops.TagAsItemID()
context_feat = groupby_features['dayofweek-first'] >> nvt.ops.AddMetadata(tags=[Tags.CONTEXT])
groupby_features_list = item_feat + groupby_features['category-list'] >> nvt.ops.AddMetadata(tags=[Tags.SEQUENCE])
groupby_features_truncated = groupby_features_list >> nvt.ops.ListSlice(-SESSIONS_MAX_LENGTH)
# Select features for training
selected_features = groupby_features['session_id', 'item_id-count'] + groupby_features_truncated + context_feat
# Filter out sessions with less than 2 interactions
MINIMUM_SESSION_LENGTH = 2
filtered_sessions = selected_features >> nvt.ops.Filter(f=lambda df: df["item_id-count"] >= MINIMUM_SESSION_LENGTH)
dataset = nvt.Dataset(interactions_merged_df)
workflow = nvt.Workflow(filtered_sessions['session_id', 'item_id-list', 'category-list', 'dayofweek-first'])
sessions_gdf = workflow.fit_transform(dataset)
sessions_gdf.to_parquet(os.path.join(DATA_FOLDER, "processed_nvt"))
workflow.save(os.path.join(DATA_FOLDER, "workflow_etl"))
import tensorflow as tf
from merlin.schema.tags import Tags
from merlin.io.dataset import Dataset
import merlin.models.tf as mm
train = Dataset(os.path.join(DATA_FOLDER, 'processed_nvt/', 'part_0.parquet'))
train.schema= train.schema.select_by_name(['item_id-list', 'category-list', 'dayofweek-first'])
seq_schema = train.schema.select_by_tag(Tags.SEQUENCE)
context_schema = train.schema.select_by_tag(Tags.CONTEXT)
target_schema = train.schema.select_by_tag(Tags.ITEM)
target = target_schema.column_names[0]
dmodel = int(os.environ.get("dmodel", '32'))
input_block = mm.InputBlockV2(
train.schema,
embeddings=mm.Embeddings(
(seq_schema + context_schema).select_by_tag(Tags.CATEGORICAL),
sequence_combiner=None,
dim=dmodel
),
post=mm.BroadcastToSequence(context_schema, seq_schema), # we use this if we have context feature (like user features).
# If you dont have context feature do NOT use this
)
xlnet_block = mm.XLNetBlock(d_model=dmodel, n_head=2, n_layer=2)
item_id_name = train.schema.select_by_tag(Tags.ITEM).first.properties['domain']['name']
print(item_id_name)
# if you want to apply sampled sofmax you should set `sampled_softmax=True`.
def get_output_block(schema, input_block=None, weight_tying = True, sampled_softmax=False, logq_correction=True):
if weight_tying:
candidate_table = input_block["categorical"][item_id_name]
to_call = candidate_table
else:
candidate = schema.select_by_tag(Tags.ITEM_ID)
to_call = candidate
if sampled_softmax:
print("applying sampled softmax")
outputs = mm.ContrastiveOutput(
to_call=to_call,
#logits_temperature=logits_temperature,
negative_samplers=mm.PopularityBasedSamplerV2(
max_num_samples=1000, # you can change this value
max_id=5384, # this value comes from the schema file, check your item-id max value.
min_id=2, #this value also depends on your processed item-id data min value.
),
logq_sampling_correction=logq_correction,
)
else:
print("NO sampled softmax, scoring against all catalog")
outputs = mm.CategoricalOutput(
to_call=to_call,
)
return outputs
d_model = dmodel
weight_tying = True
# get output block
output_block = get_output_block(train.schema, input_block=input_block)
# Define the session encoder
if weight_tying:
# project tranformer's output to same dimension as target
projection = mm.MLPBlock(
[128, output_block.to_call.table.dim],
no_activation_last_layer=True,
)
session_encoder = mm.Encoder(
input_block,
mm.MLPBlock([128, dmodel], no_activation_last_layer=True),
xlnet_block,
projection,
)
else:
session_encoder = mm.Encoder(
input_block,
mm.MLPBlock([d_model], no_activation_last_layer=True),
xlnet_block,
)
model_transformer = mm.RetrievalModelV2(query=session_encoder, output=output_block)
EPOCHS = int(os.environ.get("EPOCHS", '1'))
BATCH_SIZE = 1024
LEARNING_RATE = 0.005
optimizer = tf.keras.optimizers.Adam(
learning_rate=LEARNING_RATE,
)
# get loss
loss = tf.keras.losses.CategoricalCrossentropy(
from_logits=True
)
model_transformer.compile(
run_eagerly=False,
optimizer=optimizer,
loss=loss,
metrics=mm.TopKMetricsAggregator.default_metrics(top_ks=[10])
)
model_transformer.fit(
train,
batch_size=BATCH_SIZE,
epochs=EPOCHS,
pre=mm.SequenceMaskRandom(schema=seq_schema, target=target, masking_prob=0.3, transformer=xlnet_block),
#drop_last =True
)
# replace train with valid dataset.
loader = mm.Loader(train, batch_size=BATCH_SIZE)
batch = loader.peek()
# test only with single batch -this works
model_transformer.query_encoder(batch[0])
## iterate over the batches-- DOES NOT WORK
all_sess_embeddings = []
for batch, _ in iter(loader):
embds = model_transformer.query_encoder(batch).numpy()
del batch
gc.collect()
all_sess_embeddings.append(embds)
# USE `query_embeddings()` method -- DOES NOT WORK
train = Dataset(os.path.join(DATA_FOLDER, 'processed_nvt/', 'part_0.parquet'))
model_transformer.query_embeddings(train, batch_size=1024, index='session_id')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment