Skip to content

Instantly share code, notes, and snippets.

@tolufakiyesi
Last active July 24, 2023 22:05
Show Gist options
  • Star 9 You must be signed in to star a gist
  • Fork 5 You must be signed in to fork a gist
  • Save tolufakiyesi/b754c3b9eb3e8bbf247400331e790459 to your computer and use it in GitHub Desktop.
Save tolufakiyesi/b754c3b9eb3e8bbf247400331e790459 to your computer and use it in GitHub Desktop.
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import *
from awsglue.dynamicframe import DynamicFrame
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
OUTPUT_PATH = "s3://data-store-staging/ordered/"
OUTPUT_FILE_FORMAT = "parquet"
DATABASE = "data-pipeline-lake-staging"
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "data-pipeline-lake-staging", table_name = "profiles", transformation_ctx = "profiles_source"]
## @return: profiles_source
## @inputs: []
profiles_source = glueContext.create_dynamic_frame.from_catalog(database = "data-pipeline-lake-staging", table_name = "profiles", transformation_ctx = "profiles_source")
## @type: ApplyMapping
## @args: [mapping = [("user_id", "string", "user_id", "string"), ("gender", "string", "gender", "string"), ("age", "string", "age", "string"), ("year_of_birth", "string", "year_of_birth", "string"), ("address_continent", "string", "address_continent", "string"), ("address_region", "string", "address_region", "string"), ("address_country", "string", "address_country", "string"), ("address_lga", "string", "address_lga", "string"), ("address_state", "string", "address_state", "string"), ("phone_type", "string", "phone_type", "string"), ("profile_value", "string", "profile_value", "string"), ("browser_name", "string", "browser_name", "string"), ("browser_version", "string", "browser_version", "string"), ("phone_manufacturer", "string", "phone_manufacturer", "string"), ("phone_model", "string", "phone_model", "string"), ("os_vendor", "string", "os_vendor", "string"), ("os_name", "string", "os_name", "string"), ("os_version", "string", "os_version", "string"), ("os_sub_version", "string", "os_sub_version", "string"), ("spend_total", "double", "spend_total", "double"), ("channel", "string", "channel", "string"), ("service_name", "string", "service_name", "string"), ("service_cost", "string", "service_cost", "string"), ("interest", "string", "interest", "string"), ("recharge_mode", "string", "recharge_mode", "string"), ("airtime_recharge", "string", "airtime_recharge", "string"), ("prior_recharge", "string", "prior_recharge", "string"), ("post_recharge", "string", "post_recharge", "string"), ("useragent", "string", "useragent", "string")], transformation_ctx = "applymapping_profiles"]
## @return: applymapping_profiles
## @inputs: [frame = profiles_source]
applymapping_profiles = ApplyMapping.apply(frame = profiles_source, mappings = [("user_id", "string", "user_id", "string"), ("gender", "string", "gender", "string"), ("age", "string", "age", "string"), ("year_of_birth", "string", "year_of_birth", "string"), ("address_continent", "string", "address_continent", "string"), ("address_region", "string", "address_region", "string"), ("address_country", "string", "address_country", "string"), ("address_lga", "string", "address_lga", "string"), ("address_state", "string", "address_state", "string"), ("phone_type", "string", "phone_type", "string"), ("profile_value", "string", "profile_value", "string"), ("browser_name", "string", "browser_name", "string"), ("browser_version", "string", "browser_version", "string"), ("phone_manufacturer", "string", "phone_manufacturer", "string"), ("phone_model", "string", "phone_model", "string"), ("os_vendor", "string", "os_vendor", "string"), ("os_name", "string", "os_name", "string"), ("os_version", "string", "os_version", "string"), ("os_sub_version", "string", "os_sub_version", "string"), ("spend_total", "double", "spend_total", "double"), ("channel", "string", "channel", "string"), ("service_name", "string", "service_name", "string"), ("service_cost", "string", "service_cost", "string"), ("interest", "string", "interest", "string"), ("recharge_mode", "string", "recharge_mode", "string"), ("airtime_recharge", "string", "airtime_recharge", "string"), ("prior_recharge", "string", "prior_recharge", "string"), ("post_recharge", "string", "post_recharge", "string"), ("useragent", "string", "useragent", "string")], transformation_ctx = "applymapping_profiles")
## @type: SelectFields
## @args: [paths = ["paths"], transformation_ctx = "profiles_fields"]
## @return:profiles_fields
## @inputs:applymapping_profiles
profiles_fields = SelectFields.apply(frame = applymapping_profiles, paths = ["tap_id","msisdn", "age", "gender", "marital_status","location_address","location_continent","location_region","location_country","location_state","location_lga","location_city","device_manufacturer","device_model","os_vendor","os_name","os_version","occupation","spend_data","spend_total","spend_vas","customer_class","customer_value","@version"], transformation_ctx = "profiles_fields")
## @type: ResolveChoice
## @args: [choice = "MATCH_CATALOG", database = "data-pipeline-lake-staging", table_name = "terragon_sterling_profiles", transformation_ctx = "resolvechoiceprofiles0"]
## @return: resolvechoiceprofiles0
## @inputs: [frame = mtnBibFields]
resolvechoiceprofiles0 = ResolveChoice.apply(frame = profiles_fields, choice = "MATCH_CATALOG", database = "data-pipeline-lake-staging", table_name = "terragon_sterling_profiles", transformation_ctx = "resolvechoiceprofiles0")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoiceprofiles1"]
## @return: resolvechoiceprofiles1
## @inputs: [frame = applymapping_profiles]
resolvechoiceprofiles1 = ResolveChoice.apply(frame = applymapping_profiles, choice = "make_struct", transformation_ctx = "resolvechoiceprofiles1")
## @type: DataSource
## @args: [database = "data-pipeline-lake-new-db", table_name = "demography", transformation_ctx = "selected_source"]
## @return: selected_source
## @inputs: []
# Check location
selected_source = glueContext.create_dynamic_frame.from_catalog(database = "data-pipeline-lake-staging", table_name = "selected", transformation_ctx="selected_source")
## @type: ApplyMapping
## @args: [mappings = <mappings>, transformation_ctx = "<transformation_ctx>"]
## @return: applymapping_selected
## @inputs: selected_source
applymapping_selected = ApplyMapping.apply(frame = selected_source, mappings = [("user_id", "string", "user_id", "string"), ("column_count", "int", "column_count", "int")], transformation_ctx = "applymapping_selected")
## @type: SelectFields
## @args: [paths = ["paths"], transformation_ctx = "selected_fields"]
## @return:selected_fields
## @inputs:applymapping_selected
selected_fields = SelectFields.apply(frame = applymapping_selected, paths = ["user_id","column_count"], transformation_ctx = "selected_fields")
## @type: ResolveChoice
## @args: [choice = "MATCH_CATALOG", database = "data-pipeline-lake-staging", table_name = "selected", transformation_ctx = "resolvechoiceselected0"]
## @return: resolvechoiceselected0
## @inputs: [frame = selected_fields]
resolvechoiceselected0 = ResolveChoice.apply(frame = selected_fields, choice = "MATCH_CATALOG", database = "data-pipeline-lake-staging", table_name = "selected", transformation_ctx = "resolvechoiceselected0")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoiceselected1"]
## @return: resolvechoiceselected1
## @inputs: [frame = resolvechoiceselected0]
resolvechoiceselected1 = ResolveChoice.apply(frame = resolvechoiceselected0, choice = "make_struct", transformation_ctx = "resolvechoiceselected1")
profiles_df = resolvechoiceprofiles1.toDF()
profiles_df.createOrReplaceTempView("profiles_temp_table")
selected_df = resolvechoiceselected1.toDF()
selected_df.createOrReplaceTempView("selected_temp_table")
consolidated_df = spark.sql("""
SELECT
A.user_id,
A.gender,
A.age,
A.year_of_birth,
A.address_continent,
A.address_region,
A.address_country,
A.address_lga,
A.address_state,
A.phone_type,
A.profile_value,
A.browser_name,
A.browser_version,
A.phone_manufacturer,
A.phone_model,
A.os_vendor,
A.os_name,
A.os_version,
A.os_sub_version,
A.spend_total,
A.channel,
A.service_name,
A.service_cost,
A.interest,
B.column_count
FROM profiles_temp_table A
LEFT JOIN selected_temp_table B
ON A.user_id=B.user_id
""")
output_df = consolidated_df.orderBy('column_count', ascending=False)
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
# dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @return: datasink4
## @inputs: [frame = dropnullfields3]
consolidated_dynamicframe = DynamicFrame.fromDF(output_df.repartition(1), glueContext, "consolidated_dynamicframe")
datasink_output = glueContext.write_dynamic_frame.from_options(frame = consolidated_dynamicframe, connection_type = "s3", connection_options = {"path": OUTPUT_PATH}, format = OUTPUT_FILE_FORMAT, transformation_ctx = "datasink_output")
job.commit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment