Last active
September 3, 2021 20:09
-
-
Save luizamboni/e5e142026067f53a81091dd201d4cb80 to your computer and use it in GitHub Desktop.
glue PySpark
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Json Data | |
# { | |
# "id": String | |
# "products_count": Number | |
# "products: [ | |
# { "id": Number , "t": String , "cpc": Number } | |
# ] | |
# } | |
import sys | |
from awsglue.transforms import * | |
from awsglue.utils import getResolvedOptions | |
from pyspark.context import SparkContext | |
from awsglue.context import GlueContext | |
from awsglue.job import Job | |
# aditional imports | |
from pyspark.sql.functions import sum | |
from awsglue.dynamicframe import DynamicFrame | |
from pyspark.sql.functions import split, explode | |
glueContext = GlueContext(sc) | |
spark = glueContext.spark_session | |
ddf = glueContext.create_dynamic_frame.from_catalog( | |
database = "xxx", | |
table_name = "clicks", | |
transformation_ctx = "datasource0" | |
) | |
df = ddf.toDf() | |
sample = df.where("products_count > 0").limit(2).collect() | |
df_products = df.where("products_count > 0").limit(2) | |
df_exploded = df.select( | |
explode(df_products.products).alias("product"), | |
"*" | |
) | |
df_product_attrs = df_exploded.select( | |
df_exploded.product.id.alias("id"), | |
df_exploded.product.t.alias("name"), | |
df_exploded.product.cpc.double.alias("cpc"), | |
df_exploded.id.alias("impression_id") | |
) | |
df_product_attrs.show() | |
df_product_attrs.printSchema() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
from awsglue.transforms import * | |
from awsglue.utils import getResolvedOptions | |
from pyspark.context import SparkContext | |
from awsglue.context import GlueContext | |
from awsglue.job import Job | |
# aditional imports | |
from pyspark.sql.functions import sum | |
from awsglue.dynamicframe import DynamicFrame | |
from pyspark.sql.functions import split, explode | |
from functools import reduce | |
# get a dataframe from source | |
glueContext = GlueContext(sc) | |
spark = glueContext.spark_session | |
data = [ | |
(53, 'Inverter', 'Sim'), | |
(53, 'Material', 'PVC'), | |
(62, 'Inverter', 'Sim'), | |
(62, 'Material', 'Borracha'), | |
(62, 'Material', 'PVC'), | |
(62, 'Material', 'Produto a base de misturas de solventes') | |
(10, 'Material', 'Borracha'), | |
(10, 'Material', 'PVC'), | |
(10, 'Material', 'Produto a base de misturas de solventes'), | |
(10, 'Inverter', 'Sim'), | |
(51, 'Inverter', 'Sim'), | |
(51, 'Material', 'PVC'), | |
(61, 'Inverter', 'Sim'), | |
(61, 'Material', 'Borracha'), | |
(61, 'Material', 'PVC'), | |
(61, 'Material', 'Produto a base de misturas de solventes') | |
(11, 'Material', 'Borracha'), | |
(11, 'Material', 'PVC'), | |
(11, 'Material', 'Produto a base de misturas de solventes'), | |
(11, 'Inverter', 'Sim'), | |
(12, 'Material', 'Borracha'), | |
(12, 'Material', 'PVC'), | |
(12, 'Material', 'Produto a base de misturas de solventes'), | |
(12, 'Inverter', 'Sim'), | |
] | |
columns = ["id","name", "value"] | |
df = sc.parallelize(data).toDF(columns) | |
# group in attributes column | |
lst_of_rows_df = df.rdd.map( | |
lambda row: (row[0], row ) | |
).groupByKey( | |
).mapValues( | |
list | |
).toDF(['offer_id','attributes']) | |
lst_of_rows_df.show() | |
lst_of_rows_df.printSchema() | |
#root | |
# |-- offer_id: long (nullable = true) | |
# |-- attributes: struct (nullable = true) | |
# | |-- data: array (nullable = true) | |
# | | |-- element: struct (containsNull = true) | |
# | | | |-- id: long (nullable = true) | |
# | | | |-- name: string (nullable = true) | |
# | | | |-- value: string (nullable = true) | |
# | |-- index: long (nullable = true) | |
# | |-- maxindex: long (nullable = true) | |
# group in attributes column | |
dict_of_values_df = df.rdd.map( | |
lambda row: (row[0], row ) | |
).groupByKey( | |
).mapValues( | |
lambda row: reduce( | |
lambda m, c: {**m, **{ c.asDict()['name']: m.get(c.asDict()['name'],[]) + [c.asDict()['value']] } }, | |
row, | |
{} | |
) | |
).toDF(['offer_id','attributes']) | |
dict_of_values_df.show() | |
dict_of_values_df.printSchema() | |
# root | |
# |-- offer_id: long (nullable = true) | |
# |-- attributes: map (nullable = true) | |
# | |-- key: string | |
# | |-- value: array (valueContainsNull = true) | |
# | | |-- element: string (containsNull = true) | |
# get some 20 items of sample | |
sample = df.head(20) | |
sample_df = sc.parallelize(sample).toDF() | |
print(type(sample_df)) | |
sample_df.show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment