/sessionize_dask_cudf.py Secret
Last active
May 11, 2021 20:27
Star
You must be signed in to star a gist
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def get_session_id(df, time_out): | |
""" | |
This function creates a session id column for each click | |
The session id grows in incremeant for each user's susbequent session | |
Session boundry is defined by the time_out | |
""" | |
# Preallocate destination column for Numba | |
df["session_change_flag"] = cp.zeros(len(df), dtype="int32") | |
wcs_user_sk = df["wcs_user_sk"]._column.data_array_view | |
tstamp_inSec = df["tstamp_inSec"]._column.data_array_view | |
session_change_flag = df["session_change_flag"]._column.data_array_view | |
## configure kernel based on number of tasks | |
conf_session_change_flag_kernel = make_session_change_flag_kernel.forall( | |
len(wcs_user_sk) | |
) | |
conf_populate_session_ids_kernel = populate_session_ids_kernel.forall( | |
len(wcs_user_sk) | |
) | |
## Determine session boundries | |
conf_session_change_flag_kernel( | |
wcs_user_sk, tstamp_inSec, session_change_flag, time_out | |
) | |
## Populate session ids | |
conf_populate_session_ids_kernel(session_change_flag) | |
df = df.rename(columns={"session_change_flag": "session_id"}) | |
cuda.synchronize() | |
return df | |
# dask cudf dataframe call to sessionize | |
# after repartitioning along a user key | |
df = df.map_partitions(create_session_id, 60 * 60) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment