This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
· 6 keys that each of them repeats 4252792 | |
· 6 keys that each of them repeats 3147929 | |
· 6 keys that each of them repeats 1983560 | |
… | |
· 600 keys that each of them repeats 3 | |
· 1206 keys that each of them repeats 2 | |
· 594 keys that each of them repeats 1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
join_columns = ['year_week','line_item_id'] | |
df_join = bin_packing.left_join_with_skew_key(spark_session=spark_session, | |
df_left_src=demand_supply_df, | |
df_right_src=li_sum_predict_requests_df, | |
left_col_names_list=join_columns, | |
right_col_names_list=join_columns, | |
brodcast_hint=True, | |
num_of_partitions=None) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
SELECT | |
l.col1, | |
l.col2… , | |
r.col1, | |
r.col2… | |
FROM | |
left_tbl as l | |
LEFT OUTER JOIN ( SELECT | |
/*+ BROADCAST (right_tbl) */ | |
* |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def add_join_id_column(spark_session, df_src, df_weights, src_col_name, weights_col_name): | |
""" | |
:param spark_session: the spark session | |
:param df_src: spark datafarme which need to be updated with join_id | |
:param df_weights: spark datafarme with weights_col_name and 'join_id' | |
:param src_col_name: | |
:param weights_col_name: | |
:return: spark dataframe | |
""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def bin_packing(weights_list, max_weight_in_bin=None): | |
""" | |
:param weights_list: python list contains 'weight' for each key (or keys). weight represents the count of each key. | |
:param max_weight_in_bin: must be equal or bigger than the max weight in the weights_list. If None, using the max weight in list | |
:return: | |
""" | |
bin_id = 1 | |
weights_list = sorted(weights_list, key=itemgetter('weight'), reverse=True) | |
if max_weight_in_bin is None: | |
max_weight_in_bin = weights_list[0]['weight'] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
weights_query = '''SELECT %s ,count(1) as weight from left_table group by %s order by weight desc''' % (left_col_name, left_col_name) | |
df_join_key_weights = spark_session.sql(weights_query) | |
# list of dict | |
spark_session.sparkContext.setJobGroup(GROUP_ID, "collect rdd to python list (counting the number of repeated keys)") | |
list_join_key_weights = [{left_col_name: i[left_col_name], 'weight': i['weight']} for i in df_join_key_weights.select(left_col_name, 'weight').rdd.collect()] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
sql_query = '''SELECT col1, col2,… ,count(1) as weight from left_table_name group by col1,col2,… order by weight desc''' | |
df_join_key_count = spark.sql(sql_query) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
left_df.registerTempTable('left_table_name') | |
sql_query = '''SELECT l_col ,count(1) as weight from left_table_name group by l_col order by weight desc''' | |
df_join_weights = spark.sql(sql_query) |