Skip to content

Instantly share code, notes, and snippets.

@ijan10
Created February 27, 2019 15:43
Show Gist options
  • Save ijan10/a78ee90328aac7c69687b685057d889b to your computer and use it in GitHub Desktop.
Save ijan10/a78ee90328aac7c69687b685057d889b to your computer and use it in GitHub Desktop.
def add_join_id_column(spark_session, df_src, df_weights, src_col_name, weights_col_name):
"""
:param spark_session: the spark session
:param df_src: spark datafarme which need to be updated with join_id
:param df_weights: spark datafarme with weights_col_name and 'join_id'
:param src_col_name:
:param weights_col_name:
:return: spark dataframe
"""
src_table_name = 'src_table'
weights_table_name = 'weights_table'
df_src.registerTempTable(src_table_name)
df_weights.registerTempTable(weights_table_name)
selected_columns = ['l' + "." + col_name for col_name in df_src.columns] + ['r.' + BIN_ID]
query_add_join_id = '''SELECT %s
FROM %s as l
LEFT OUTER JOIN (SELECT /*+ BROADCAST (%s) */ * from (%s)) as r
ON l.%s = r.%s''' % (', '.join(selected_columns), src_table_name, weights_table_name, weights_table_name, src_col_name, weights_col_name)
print_query(query_add_join_id)
df_left_with_join_id=spark_session.sql(query_add_join_id)
spark_session.catalog.dropTempView(src_table_name)
return df_left_with_join_id
df_left = add_join_id_column(spark_session=spark_session, df_src=df_left,
df_weights=bin_packing_df,
src_col_name=left_col_name,
weights_col_name=left_col_name)
df_left = df_left.repartition(num_of_partitions, BIN_ID, left_col_name)
df_right = add_join_id_column(spark_session=spark_session, df_src=df_right,
df_weights=bin_packing_df,
src_col_name=right_col_name,
weights_col_name=left_col_name)
df_right = df_right.repartition(num_of_partitions, BIN_ID, right_col_name)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment