Skip to content

Instantly share code, notes, and snippets.

@anbento0490
Created March 2, 2023 15:02
Show Gist options
  • Save anbento0490/1535450ef772d45fa8525d820d8ff2bb to your computer and use it in GitHub Desktop.
Save anbento0490/1535450ef772d45fa8525d820d8ff2bb to your computer and use it in GitHub Desktop.
# USING PANDAS
import pandas as pd
import os
import logging
logging.basicConfig(level=logging.INFO)
current_dir = os.getcwd()
logging.info('Datasets will be read from current directory %s', current_dir)
dataset1 = pd.read_csv(current_dir + '/dataset1.csv')
dataset2 = pd.read_csv(current_dir + '/dataset2.csv')
logging.info('Imported datasets as pandas DF with shape %s and %s', dataset1.shape, dataset2.shape)
logging.info('Left joining dataset1 with dataset2...')
schema = {'invoice_id': 'int32',
'legal_entity': 'string',
'counter_party': 'string',
'rating': 'int32',
'status': 'string',
'value': 'int32'}
merged_df = pd.merge(dataset1, dataset2, on='counter_party', how = 'left')
merged_df = merged_df.astype(schema)
merged_df.head(20)
#######
cond_ARAP = merged_df['status'] == 'ARAP'
cond_ACCR = merged_df['status'] == 'ACCR'
agg_ = {'rating' : ['max'],
'value': [lambda x: x[cond_ARAP].sum(),
lambda x: x[cond_ACCR].sum()]
}
agg_df = merged_df.groupby(['legal_entity', 'counter_party', 'tier']).agg(agg_)
agg_df.columns = agg_df.columns.map('_'.join)
agg_df = agg_df.reset_index().rename(columns={ 'legal_entity': 'legal_entity',
'counter_party': 'counterparty',
'tier': 'tier',
'rating_max': 'max_rating_by_cp',
'value_<lambda_0>': 'sum_ARAP',
'value_<lambda_1>': 'sum_ACCR'})
agg_df.to_csv(current_dir + '/output_agg_df.csv')
agg_df.head(10)
#######
# USING PYSPARK - INCLUDES CUBE CALCULATION
import os
import logging
import boto3
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, max, col
logging.basicConfig(level=logging.INFO)
spark = SparkSession.builder.appName("Join Datasets").getOrCreate()
current_dir = os.getcwd()
logging.info('Datasets will be read from current directory %s', current_dir)
dataset1 = spark.read.format("csv").option("header", "true").load(current_dir + '/dataset1.csv')
dataset2 = spark.read.format("csv").option("header", "true").load(current_dir + '/dataset2.csv')
logging.info('Left joining dataset1 with dataset2...')
merged_df = dataset1.join(dataset2, on='counter_party', how='left')
merged_df.show()
########
cond_ARAP = (col("status") == 'ARAP').cast("int")
cond_ACCR = (col("status") == 'ACCR').cast("int")
# LAZY EVALUATION
agg_df = merged_df.groupby(['legal_entity', 'counter_party', 'tier']).agg(max('rating').alias('max_rating_by_cp'),\
sum(col("value") * cond_ARAP).alias('ARAP_sum'),
sum(col("value") * cond_ACCR).alias('ACCR_sum'))
agg_df.show()
########
cube_df = merged_df.cube('legal_entity', 'counter_party', 'tier').agg(sum('value').alias('tot_value'),\
max('rating').alias('max_rating_by_cp'),\
sum(col("value") * cond_ARAP).alias('ARAP_sum'),\
sum(col("value") * cond_ACCR).alias('ACCR_sum'))\
.orderBy(col('legal_entity').asc_nulls_last())
cube_df.show()
cube_df.write.options(header='True', delimiter=',')\
.csv(current_dir + '/output_cube_df.csv')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment