Skip to content

Instantly share code, notes, and snippets.

@luizamboni
Last active September 3, 2021 20:09
Show Gist options
  • Save luizamboni/e5e142026067f53a81091dd201d4cb80 to your computer and use it in GitHub Desktop.
Save luizamboni/e5e142026067f53a81091dd201d4cb80 to your computer and use it in GitHub Desktop.
glue PySpark
# Json Data
# {
# "id": String
# "products_count": Number
# "products: [
# { "id": Number , "t": String , "cpc": Number }
# ]
# }
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
# aditional imports
from pyspark.sql.functions import sum
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import split, explode
glueContext = GlueContext(sc)
spark = glueContext.spark_session
ddf = glueContext.create_dynamic_frame.from_catalog(
database = "xxx",
table_name = "clicks",
transformation_ctx = "datasource0"
)
df = ddf.toDf()
sample = df.where("products_count > 0").limit(2).collect()
df_products = df.where("products_count > 0").limit(2)
df_exploded = df.select(
explode(df_products.products).alias("product"),
"*"
)
df_product_attrs = df_exploded.select(
df_exploded.product.id.alias("id"),
df_exploded.product.t.alias("name"),
df_exploded.product.cpc.double.alias("cpc"),
df_exploded.id.alias("impression_id")
)
df_product_attrs.show()
df_product_attrs.printSchema()
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
# aditional imports
from pyspark.sql.functions import sum
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import split, explode
from functools import reduce
# get a dataframe from source
glueContext = GlueContext(sc)
spark = glueContext.spark_session
data = [
(53, 'Inverter', 'Sim'),
(53, 'Material', 'PVC'),
(62, 'Inverter', 'Sim'),
(62, 'Material', 'Borracha'),
(62, 'Material', 'PVC'),
(62, 'Material', 'Produto a base de misturas de solventes')
(10, 'Material', 'Borracha'),
(10, 'Material', 'PVC'),
(10, 'Material', 'Produto a base de misturas de solventes'),
(10, 'Inverter', 'Sim'),
(51, 'Inverter', 'Sim'),
(51, 'Material', 'PVC'),
(61, 'Inverter', 'Sim'),
(61, 'Material', 'Borracha'),
(61, 'Material', 'PVC'),
(61, 'Material', 'Produto a base de misturas de solventes')
(11, 'Material', 'Borracha'),
(11, 'Material', 'PVC'),
(11, 'Material', 'Produto a base de misturas de solventes'),
(11, 'Inverter', 'Sim'),
(12, 'Material', 'Borracha'),
(12, 'Material', 'PVC'),
(12, 'Material', 'Produto a base de misturas de solventes'),
(12, 'Inverter', 'Sim'),
]
columns = ["id","name", "value"]
df = sc.parallelize(data).toDF(columns)
# group in attributes column
lst_of_rows_df = df.rdd.map(
lambda row: (row[0], row )
).groupByKey(
).mapValues(
list
).toDF(['offer_id','attributes'])
lst_of_rows_df.show()
lst_of_rows_df.printSchema()
#root
# |-- offer_id: long (nullable = true)
# |-- attributes: struct (nullable = true)
# | |-- data: array (nullable = true)
# | | |-- element: struct (containsNull = true)
# | | | |-- id: long (nullable = true)
# | | | |-- name: string (nullable = true)
# | | | |-- value: string (nullable = true)
# | |-- index: long (nullable = true)
# | |-- maxindex: long (nullable = true)
# group in attributes column
dict_of_values_df = df.rdd.map(
lambda row: (row[0], row )
).groupByKey(
).mapValues(
lambda row: reduce(
lambda m, c: {**m, **{ c.asDict()['name']: m.get(c.asDict()['name'],[]) + [c.asDict()['value']] } },
row,
{}
)
).toDF(['offer_id','attributes'])
dict_of_values_df.show()
dict_of_values_df.printSchema()
# root
# |-- offer_id: long (nullable = true)
# |-- attributes: map (nullable = true)
# | |-- key: string
# | |-- value: array (valueContainsNull = true)
# | | |-- element: string (containsNull = true)
# get some 20 items of sample
sample = df.head(20)
sample_df = sc.parallelize(sample).toDF()
print(type(sample_df))
sample_df.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment