Skip to content

Instantly share code, notes, and snippets.

@karol-blaszczyk
Last active March 28, 2023 08:55
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save karol-blaszczyk/decd2114c1c049bb836ca766d7eb40cc to your computer and use it in GitHub Desktop.
Save karol-blaszczyk/decd2114c1c049bb836ca766d7eb40cc to your computer and use it in GitHub Desktop.
import sys
import re
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import col
from awsglue.transforms import *
import pg8000
import hashlib
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv,
['JOB_NAME',
'db_name',
'db_user',
'db_password',
'db_host'])
# CREATE SPARK & GLUE CONTEXT's
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## Connect PG8000 to database
conn = pg8000.connect(database=args['db_name'],user=args['db_user'],password=args['db_password'],host=args['db_host'],port=5432)
cur = conn.cursor()
# data catalog from aws glue
catalog_database = "reports"
age_catalog = "aage_report"
gender_catalog = "gender_report"
performance_catalog = "campaign_report"
# Main Database Tables that data will be UPSERT to
campaigns_table = "campaigns"
adgroups_table = "adgroups"
reports_table = "reports"
# Temporary tables used for UPSERT'ing
tmp_campaigns_peformance_table = "performance_campaigns_tmp"
tmp_campaigns_age_table = "age_campaigns_tmp"
tmp_campaigns_gender_table = "gender_campaigns_tmp"
tmp_adgroup_age_table = "age_ad_groups_tmp"
tmp_adgroup_gender_table = "gender_ad_groups_tmp"
tmp_performance_report_table = "performance_report_tmp"
tmp_gender_report_table = "gender_report_tmp"
tmp_age_report_table = "age_report_tmp"
# Generate report ID
def _report_id(row):
m = hashlib.md5()
if 'age range' in row:
m.update('{}_{}_{}_{}_{}_{}'.format(row["campaign id"],row["ad group id"], row["day"], row["type"], row['device'], row['age range']).encode())
return m.hexdigest()
elif 'gender' in row:
m.update('{}_{}_{}_{}_{}_{}'.format(row["campaign id"],row["ad group id"], row["day"], row["type"], row['device'], row['gender']).encode())
return m.hexdigest()
else:
m.update('{}_{}'.format(row["campaign id"], row["day"]).encode())
return m.hexdigest()
####### Map STRIKE ID for each row, Map video played to 100% as double
def map_function(dynamicRecord):
if 'age range' in dynamicRecord:
dynamicRecord['type'] = 'AGE'
elif 'gender' in dynamicRecord:
dynamicRecord['type'] = 'GENDER'
else:
dynamicRecord['type'] = 'PERFORMANCE'
dynamicRecord['report_id'] = _report_id(dynamicRecord)
return dynamicRecord
# Save DynamicFrame to Database table
# Remove duplicated records before save
def _save_to_database(frame, table_name):
df = frame.toDF().drop_duplicates(subset = ['id'])
df.write.format("jdbc").mode('overwrite')\
.option("url", "jdbc:postgresql://{}:5432/{}".format(args['db_host'],args['db_name'])) \
.option("user", args['db_user']).option("password", args['db_password']).option("truncate", "true") \
.option("dbtable", table_name).save()
# Extract campaign data from report and save it to database table
def _save_campaings_from_report(frame, table_name):
campaigns = SelectFields.apply(frame = frame, paths = ['campaign', 'campaign id'], transformation_ctx='campaigns_{}'.format(table_name))
campaings_mapped = ApplyMapping.apply(
frame = campaigns,
mappings = [("campaign id", "long", "id", "long"), ("campaign", "string", "name", "string")],
transformation_ctx = 'mapping_campaigns_{}'.format(table_name)
)
_save_to_database(campaings_mapped,table_name)
# Extract AdGroups data from report and save it to database table
def _save_ad_groups_from_report(frame, table_name):
adgroups = SelectFields.apply(frame = frame, paths = ['ad group', 'ad group id'], transformation_ctx='adgroup_{}'.format(table_name))
adgroups_mapped = ApplyMapping.apply(
frame = adgroups,
mappings = [("ad group id", "long", "id", "long"), ("ad group", "string", "name", "string")],
transformation_ctx = 'mapping_adgroup_{}'.format(table_name)
)
_save_to_database(adgroups_mapped, table_name)
# Save Report data to database table
def _save_report(frame=[],mappings=[],table_name="",drop_fields=[]):
report = DropFields.apply(frame = frame, paths = drop_fields, transformation_ctx = table_name)
report = ApplyMapping.apply(frame = report, mappings = mappings, transformation_ctx = 'ApplyMapping_{}'.format(table_name))
_save_to_database(report, table_name)
# Collect data from catalog
def _get_mapped_dynamic_frame(table_name):
# Collect data from catalog
data = glueContext.create_dynamic_frame.from_catalog(database = catalog_database, table_name = table_name, transformation_ctx = table_name)
if data.count() == 0:
return data
# Map Reports
data = Map.apply(frame = data, f = map_function, transformation_ctx = table_name)
return data
#### AGE
age_report = _get_mapped_dynamic_frame(age_catalog)
if age_report.count() > 0:
_save_campaings_from_report(age_report, tmp_campaigns_age_table)
_save_ad_groups_from_report(age_report, tmp_adgroup_age_table)
_save_report(frame = age_report, table_name = tmp_age_report_table, drop_fields = ['campaign', 'ad group'], mappings = [
("report_id", "string", "id", "string"),
("type", "string", "type", "string"),
("campaign id", "long", "campaign_id", "long"),
("day", "string", "day", "date"),
("clicks", "long", "clicks", "long"),
("cost", "long", "cost", "long"),
("impressions", "long", "impressions", "long"),
("views", "long", "views", "long"),
("ad group id", "long", "adgroup_id", "long"),
("device", "string", "device", "string"),
("age range", "string", "age_range", "string"),
])
cur.execute("INSERT INTO {} SELECT * FROM {} ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name".format(campaigns_table, tmp_campaigns_age_table))
cur.execute("INSERT INTO {} SELECT * FROM {} ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name".format(adgroups_table,tmp_adgroup_age_table))
cur.execute("INSERT INTO {} (id, type, campaign_id, day, clicks, cost, impressions, views, video_played, adgroup_id, device, age_range) SELECT * FROM {} ON CONFLICT (id) DO UPDATE SET clicks = EXCLUDED.clicks, cost = EXCLUDED.cost, impressions = EXCLUDED.impressions, views = EXCLUDED.views".format(reports_table, tmp_age_report_table))
cur.execute("DROP TABLE {}".format(tmp_campaigns_age_table))
cur.execute("DROP TABLE {}".format(tmp_adgroup_age_table))
cur.execute("DROP TABLE {}".format(tmp_age_report_table))
### GENDER
gender_report = _get_mapped_dynamic_frame(gender_catalog)
if gender_report.count() > 0:
_save_campaings_from_report(gender_report, tmp_campaigns_gender_table)
_save_ad_groups_from_report(gender_report, tmp_adgroup_gender_table)
_save_report(frame = gender_report, table_name = tmp_gender_report_table, drop_fields = ['campaign', 'ad group'], mappings = [
("report_id", "string", "id", "string"),
("type", "string", "type", "string"),
("campaign id", "long", "campaign_id", "long"),
("day", "string", "day", "date"),
("clicks", "long", "clicks", "long"),
("cost", "long", "cost", "long"),
("impressions", "long", "impressions", "long"),
("views", "long", "views", "long"),
("ad group id", "long", "adgroup_id", "long"),
("device", "string", "device", "string"),
("gender", "string", "gender", "string"),
])
cur.execute("INSERT INTO {} SELECT * FROM {} ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name".format(campaigns_table, tmp_campaigns_gender_table))
cur.execute("INSERT INTO {} SELECT * FROM {} ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name".format(adgroups_table,tmp_adgroup_gender_table))
cur.execute("INSERT INTO {} (id, type, campaign_id, day, clicks, cost, impressions, views, adgroup_id, device, gender) SELECT * FROM {} ON CONFLICT (id) DO UPDATE SET clicks = EXCLUDED.clicks, cost = EXCLUDED.cost, impressions = EXCLUDED.impressions, views = EXCLUDED.views".format(reports_table, tmp_gender_report_table))
cur.execute("DROP TABLE {}".format(tmp_campaigns_gender_table))
cur.execute("DROP TABLE {}".format(tmp_adgroup_gender_table))
cur.execute("DROP TABLE {}".format(tmp_gender_report_table))
# PERFORMANCE
performance_report = _get_mapped_dynamic_frame(performance_catalog)
if performance_report.count() > 0:
_save_campaings_from_report(performance_report, tmp_campaigns_peformance_table)
_save_report(frame = performance_report, table_name = tmp_performance_report_table, drop_fields = ['campaign'], mappings = [
("report_id", "string", "id", "string"),
("type", "string", "type", "string"),
("campaign id", "long", "campaign_id", "long"),
("day", "string", "day", "date"),
("clicks", "long", "clicks", "long"),
("cost", "long", "cost", "long"),
("impressions", "long", "impressions", "long"),
("views", "long", "views", "long"),
])
cur.execute("INSERT INTO {} SELECT * FROM {} ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name".format(campaigns_table, tmp_campaigns_peformance_table))
cur.execute("INSERT INTO {} (id, type, campaign_id, day, clicks, cost, impressions, views) SELECT * FROM {} ON CONFLICT (id) DO UPDATE SET clicks = EXCLUDED.clicks, cost = EXCLUDED.cost, impressions = EXCLUDED.impressions, views = EXCLUDED.views".format(reports_table, tmp_performance_report_table))
cur.execute("DROP TABLE {}".format(tmp_campaigns_peformance_table))
cur.execute("DROP TABLE {}".format(tmp_performance_report_table))
conn.commit()
cur.close()
conn.close()
job.commit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment