Skip to content

Instantly share code, notes, and snippets.

left_df.registerTempTable('left_table_name')
sql_query = '''SELECT l_col ,count(1) as weight from left_table_name group by l_col order by weight desc'''
df_join_weights = spark.sql(sql_query)
sql_query = '''SELECT col1, col2,… ,count(1) as weight from left_table_name group by col1,col2,… order by weight desc'''
df_join_key_count = spark.sql(sql_query)
weights_query = '''SELECT %s ,count(1) as weight from left_table group by %s order by weight desc''' % (left_col_name, left_col_name)
df_join_key_weights = spark_session.sql(weights_query)
# list of dict
spark_session.sparkContext.setJobGroup(GROUP_ID, "collect rdd to python list (counting the number of repeated keys)")
list_join_key_weights = [{left_col_name: i[left_col_name], 'weight': i['weight']} for i in df_join_key_weights.select(left_col_name, 'weight').rdd.collect()]
def bin_packing(weights_list, max_weight_in_bin=None):
"""
:param weights_list: python list contains 'weight' for each key (or keys). weight represents the count of each key.
:param max_weight_in_bin: must be equal or bigger than the max weight in the weights_list. If None, using the max weight in list
:return:
"""
bin_id = 1
weights_list = sorted(weights_list, key=itemgetter('weight'), reverse=True)
if max_weight_in_bin is None:
max_weight_in_bin = weights_list[0]['weight']
def add_join_id_column(spark_session, df_src, df_weights, src_col_name, weights_col_name):
"""
:param spark_session: the spark session
:param df_src: spark datafarme which need to be updated with join_id
:param df_weights: spark datafarme with weights_col_name and 'join_id'
:param src_col_name:
:param weights_col_name:
:return: spark dataframe
"""
SELECT
l.col1,
l.col2… ,
r.col1,
r.col2…
FROM
left_tbl as l
LEFT OUTER JOIN ( SELECT
/*+ BROADCAST (right_tbl) */
*
join_columns = ['year_week','line_item_id']
df_join = bin_packing.left_join_with_skew_key(spark_session=spark_session,
df_left_src=demand_supply_df,
df_right_src=li_sum_predict_requests_df,
left_col_names_list=join_columns,
right_col_names_list=join_columns,
brodcast_hint=True,
num_of_partitions=None)
· 6 keys that each of them repeats 4252792
· 6 keys that each of them repeats 3147929
· 6 keys that each of them repeats 1983560
· 600 keys that each of them repeats 3
· 1206 keys that each of them repeats 2
· 594 keys that each of them repeats 1