Skip to content

Instantly share code, notes, and snippets.

@brentarias
Created May 17, 2019 05:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save brentarias/9239361331827d3689f1d2dc2c659ee9 to your computer and use it in GitHub Desktop.
Save brentarias/9239361331827d3689f1d2dc2c659ee9 to your computer and use it in GitHub Desktop.
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "tesseract", table_name = "tesgs_soi_product_bucket", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "tesseract", table_name = "tesgs_soi_product_bucket", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("hierarchy", "struct", "hierarchy", "struct"), ("group", "struct", "group", "struct"), ("master", "struct", "master", "struct"), ("sku", "string", "sku", "string"), ("display_title", "string", "display_title", "string"), ("erp_title", "string", "erp_title", "string"), ("item_type", "string", "item_type", "string"), ("item_number", "int", "item_number", "int"), ("lifecycle_status_code", "int", "lifecycle_status_code", "int"), ("lifecycle_status", "string", "lifecycle_status", "string"), ("category_name", "string", "category_name", "string"), ("category_code", "string", "category_code", "string"), ("category_type", "string", "category_type", "string"), ("genre_subgenre_id", "int", "genre_subgenre_id", "int"), ("genre_name", "string", "genre_name", "string"), ("sub_genre_name", "string", "sub_genre_name", "string"), ("allspark_id", "string", "allspark_id", "string"), ("barcodes", "struct", "barcodes", "struct"), ("vendor_part_number", "string", "vendor_part_number", "string"), ("vendor_id", "int", "vendor_id", "int"), ("vendor_name", "string", "vendor_name", "string"), ("publisher_id", "int", "publisher_id", "int"), ("publisher_name", "string", "publisher_name", "string"), ("esrb_rating", "string", "esrb_rating", "string"), ("esrb_descriptors", "string", "esrb_descriptors", "string"), ("is_embargoed", "boolean", "is_embargoed", "boolean"), ("embargo_date", "string", "embargo_date", "string"), ("is_streetdate_enforced", "boolean", "is_streetdate_enforced", "boolean"), ("street_date", "string", "street_date", "string"), ("is_prp_eligible", "string", "is_prp_eligible", "string"), ("prp_group", "string", "prp_group", "string"), ("is_gpg_eligible", "string", "is_gpg_eligible", "string"), ("is_sellable", "boolean", "is_sellable", "boolean"), ("is_real_time_message", "boolean", "is_real_time_message", "boolean"), ("family_name", "string", "family_name", "string"), ("developer_name", "string", "developer_name", "string"), ("is_drop_ship", "boolean", "is_drop_ship", "boolean"), ("legacy_product_id", "int", "legacy_product_id", "int"), ("mpaa_rating", "string", "mpaa_rating", "string"), ("brand_name", "string", "brand_name", "string"), ("manufacturer_name", "string", "manufacturer_name", "string"), ("platform_name", "string", "platform_name", "string"), ("report_category_code", "string", "report_category_code", "string"), ("report_category_name", "string", "report_category_name", "string"), ("sub_category_code", "int", "sub_category_code", "int"), ("sub_category_name", "string", "sub_category_name", "string"), ("product_page_type", "int", "product_page_type", "int"), ("ecp_genre_subgenre_id", "int", "ecp_genre_subgenre_id", "int"), ("ecp_genre_name", "string", "ecp_genre_name", "string"), ("ecp_subgenre_name", "string", "ecp_subgenre_name", "string"), ("ip_owner_name", "string", "ip_owner_name", "string"), ("franchise_name", "string", "franchise_name", "string"), ("character_name", "string", "character_name", "string"), ("common_title", "string", "common_title", "string"), ("seasonality", "string", "seasonality", "string"), ("seasonality_year", "string", "seasonality_year", "string"), ("non_gaap_commission_type", "string", "non_gaap_commission_type", "string"), ("non_gaap_amount", "string", "non_gaap_amount", "string"), ("digital_publisher_commission_type", "string", "digital_publisher_commission_type", "string"), ("digital_publisher_amount", "string", "digital_publisher_amount", "string"), ("digital_bundle_credit_type", "string", "digital_bundle_credit_type", "string"), ("digital_bundle_amount", "string", "digital_bundle_amount", "string"), ("end_of_life_date", "string", "end_of_life_date", "string"), ("size", "string", "size", "string"), ("gender", "string", "gender", "string"), ("material", "string", "material", "string"), ("color", "string", "color", "string"), ("color_tech", "string", "color_tech", "string"), ("memory_size", "string", "memory_size", "string"), ("oem", "string", "oem", "string"), ("connectivity", "string", "connectivity", "string"), ("edition", "string", "edition", "string"), ("carriers", "string", "carriers", "string"), ("online_override_street_date", "string", "online_override_street_date", "string"), ("preload_date", "string", "preload_date", "string"), ("eta_date", "string", "eta_date", "string"), ("first_received_date", "string", "first_received_date", "string"), ("last_master_id", "int", "last_master_id", "int"), ("masterpack_qty", "int", "masterpack_qty", "int"), ("country_of_origin", "string", "country_of_origin", "string"), ("isbn", "string", "isbn", "string"), ("return_type", "string", "return_type", "string"), ("is_emp_discount_allowed", "boolean", "is_emp_discount_allowed", "boolean"), ("commission", "double", "commission", "double"), ("sales_price", "double", "sales_price", "double"), ("use_default_tradein", "boolean", "use_default_tradein", "boolean"), ("is_tradein", "boolean", "is_tradein", "boolean"), ("tradein_sku", "string", "tradein_sku", "string"), ("tradein_value", "double", "tradein_value", "double"), ("cost", "double", "cost", "double"), ("is_reservable", "boolean", "is_reservable", "boolean"), ("reservation_sku", "string", "reservation_sku", "string"), ("reservation_effective_date", "string", "reservation_effective_date", "string"), ("reservation_minimum_deposit", "double", "reservation_minimum_deposit", "double"), ("retail_sku", "string", "retail_sku", "string"), ("serial_number_type", "int", "serial_number_type", "int"), ("eregistration", "string", "eregistration", "string"), ("is_layaway_eligible", "boolean", "is_layaway_eligible", "boolean"), ("layaway_start_date", "string", "layaway_start_date", "string"), ("layaway_end_date", "string", "layaway_end_date", "string"), ("commission_type", "string", "commission_type", "string"), ("loyalty_program_id", "int", "loyalty_program_id", "int"), ("is_loyalty_renewal", "boolean", "is_loyalty_renewal", "boolean"), ("loyalty_duration", "int", "loyalty_duration", "int"), ("posa_card_id", "int", "posa_card_id", "int"), ("presell_hours", "int", "presell_hours", "int"), ("is_clearance_flagged_by_channel", "boolean", "is_clearance_flagged_by_channel", "boolean"), ("is_di_approved", "boolean", "is_di_approved", "boolean"), ("is_merchant_director_approved", "boolean", "is_merchant_director_approved", "boolean"), ("is_merchant_approved", "boolean", "is_merchant_approved", "boolean"), ("web_publisher_name", "string", "web_publisher_name", "string"), ("is_rsb_eligible", "string", "is_rsb_eligible", "string"), ("is_replenishable", "string", "is_replenishable", "string"), ("is_directship", "string", "is_directship", "string"), ("is_defective_return_only", "boolean", "is_defective_return_only", "boolean"), ("recommerce", "string", "recommerce", "string"), ("shipping", "struct", "shipping", "struct")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("hierarchy", "struct", "hierarchy", "struct"), ("group", "struct", "group", "struct"), ("master", "struct", "master", "struct"), ("sku", "string", "sku", "string"), ("display_title", "string", "display_title", "string"), ("erp_title", "string", "erp_title", "string"), ("item_type", "string", "item_type", "string"), ("item_number", "int", "item_number", "int"), ("lifecycle_status_code", "int", "lifecycle_status_code", "int"), ("lifecycle_status", "string", "lifecycle_status", "string"), ("category_name", "string", "category_name", "string"), ("category_code", "string", "category_code", "string"), ("category_type", "string", "category_type", "string"), ("genre_subgenre_id", "int", "genre_subgenre_id", "int"), ("genre_name", "string", "genre_name", "string"), ("sub_genre_name", "string", "sub_genre_name", "string"), ("allspark_id", "string", "allspark_id", "string"), ("barcodes", "struct", "barcodes", "struct"), ("vendor_part_number", "string", "vendor_part_number", "string"), ("vendor_id", "int", "vendor_id", "int"), ("vendor_name", "string", "vendor_name", "string"), ("publisher_id", "int", "publisher_id", "int"), ("publisher_name", "string", "publisher_name", "string"), ("esrb_rating", "string", "esrb_rating", "string"), ("esrb_descriptors", "string", "esrb_descriptors", "string"), ("is_embargoed", "boolean", "is_embargoed", "boolean"), ("embargo_date", "string", "embargo_date", "string"), ("is_streetdate_enforced", "boolean", "is_streetdate_enforced", "boolean"), ("street_date", "string", "street_date", "string"), ("is_prp_eligible", "string", "is_prp_eligible", "string"), ("prp_group", "string", "prp_group", "string"), ("is_gpg_eligible", "string", "is_gpg_eligible", "string"), ("is_sellable", "boolean", "is_sellable", "boolean"), ("is_real_time_message", "boolean", "is_real_time_message", "boolean"), ("family_name", "string", "family_name", "string"), ("developer_name", "string", "developer_name", "string"), ("is_drop_ship", "boolean", "is_drop_ship", "boolean"), ("legacy_product_id", "int", "legacy_product_id", "int"), ("mpaa_rating", "string", "mpaa_rating", "string"), ("brand_name", "string", "brand_name", "string"), ("manufacturer_name", "string", "manufacturer_name", "string"), ("platform_name", "string", "platform_name", "string"), ("report_category_code", "string", "report_category_code", "string"), ("report_category_name", "string", "report_category_name", "string"), ("sub_category_code", "int", "sub_category_code", "int"), ("sub_category_name", "string", "sub_category_name", "string"), ("product_page_type", "int", "product_page_type", "int"), ("ecp_genre_subgenre_id", "int", "ecp_genre_subgenre_id", "int"), ("ecp_genre_name", "string", "ecp_genre_name", "string"), ("ecp_subgenre_name", "string", "ecp_subgenre_name", "string"), ("ip_owner_name", "string", "ip_owner_name", "string"), ("franchise_name", "string", "franchise_name", "string"), ("character_name", "string", "character_name", "string"), ("common_title", "string", "common_title", "string"), ("seasonality", "string", "seasonality", "string"), ("seasonality_year", "string", "seasonality_year", "string"), ("non_gaap_commission_type", "string", "non_gaap_commission_type", "string"), ("non_gaap_amount", "string", "non_gaap_amount", "string"), ("digital_publisher_commission_type", "string", "digital_publisher_commission_type", "string"), ("digital_publisher_amount", "string", "digital_publisher_amount", "string"), ("digital_bundle_credit_type", "string", "digital_bundle_credit_type", "string"), ("digital_bundle_amount", "string", "digital_bundle_amount", "string"), ("end_of_life_date", "string", "end_of_life_date", "string"), ("size", "string", "size", "string"), ("gender", "string", "gender", "string"), ("material", "string", "material", "string"), ("color", "string", "color", "string"), ("color_tech", "string", "color_tech", "string"), ("memory_size", "string", "memory_size", "string"), ("oem", "string", "oem", "string"), ("connectivity", "string", "connectivity", "string"), ("edition", "string", "edition", "string"), ("carriers", "string", "carriers", "string"), ("online_override_street_date", "string", "online_override_street_date", "string"), ("preload_date", "string", "preload_date", "string"), ("eta_date", "string", "eta_date", "string"), ("first_received_date", "string", "first_received_date", "string"), ("last_master_id", "int", "last_master_id", "int"), ("masterpack_qty", "int", "masterpack_qty", "int"), ("country_of_origin", "string", "country_of_origin", "string"), ("isbn", "string", "isbn", "string"), ("return_type", "string", "return_type", "string"), ("is_emp_discount_allowed", "boolean", "is_emp_discount_allowed", "boolean"), ("commission", "double", "commission", "double"), ("sales_price", "double", "sales_price", "double"), ("use_default_tradein", "boolean", "use_default_tradein", "boolean"), ("is_tradein", "boolean", "is_tradein", "boolean"), ("tradein_sku", "string", "tradein_sku", "string"), ("tradein_value", "double", "tradein_value", "double"), ("cost", "double", "cost", "double"), ("is_reservable", "boolean", "is_reservable", "boolean"), ("reservation_sku", "string", "reservation_sku", "string"), ("reservation_effective_date", "string", "reservation_effective_date", "string"), ("reservation_minimum_deposit", "double", "reservation_minimum_deposit", "double"), ("retail_sku", "string", "retail_sku", "string"), ("serial_number_type", "int", "serial_number_type", "int"), ("eregistration", "string", "eregistration", "string"), ("is_layaway_eligible", "boolean", "is_layaway_eligible", "boolean"), ("layaway_start_date", "string", "layaway_start_date", "string"), ("layaway_end_date", "string", "layaway_end_date", "string"), ("commission_type", "string", "commission_type", "string"), ("loyalty_program_id", "int", "loyalty_program_id", "int"), ("is_loyalty_renewal", "boolean", "is_loyalty_renewal", "boolean"), ("loyalty_duration", "int", "loyalty_duration", "int"), ("posa_card_id", "int", "posa_card_id", "int"), ("presell_hours", "int", "presell_hours", "int"), ("is_clearance_flagged_by_channel", "boolean", "is_clearance_flagged_by_channel", "boolean"), ("is_di_approved", "boolean", "is_di_approved", "boolean"), ("is_merchant_director_approved", "boolean", "is_merchant_director_approved", "boolean"), ("is_merchant_approved", "boolean", "is_merchant_approved", "boolean"), ("web_publisher_name", "string", "web_publisher_name", "string"), ("is_rsb_eligible", "string", "is_rsb_eligible", "string"), ("is_replenishable", "string", "is_replenishable", "string"), ("is_directship", "string", "is_directship", "string"), ("is_defective_return_only", "boolean", "is_defective_return_only", "boolean"), ("recommerce", "string", "recommerce", "string"), ("shipping", "struct", "shipping", "struct")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: Relationalize
## @args: [staging_path = "s3://aws-glue-temporary-532655298523-us-east-1", name = "root", transformation_ctx = "dfc"]
## @return: flatten
## @inputs: [frame = resolvechoice2]
glue_temp_storage = "s3://aws-glue-temporary-532655298523-us-east-1"
dfc_root_table_name = "root"
dfc = Relationalize.apply(frame = resolvechoice2, staging_path = "glue_temp_storage", name = "dfc_root_table_name", transformation_ctx = "dfc")
flatten = dfc.select(dfc_root_table_name)
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = flatten, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://gs-parquet-files"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://gs-parquet-files"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment