Last active
November 26, 2021 12:06
-
-
Save aialenti/dfe0e4ac3af8f8f5481cf882e95aa17a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql import SparkSession | |
from pyspark.sql.functions import * | |
from pyspark.sql import Row, Window | |
from pyspark.sql.types import IntegerType | |
import hashlib | |
# Init spark session | |
spark = SparkSession.builder \ | |
.master("local") \ | |
.config("spark.sql.autoBroadcastJoinThreshold", -1) \ | |
.config("spark.executor.memory", "1g") \ | |
.appName("Exercise1") \ | |
.getOrCreate() | |
# Load source data | |
products_table = spark.read.parquet("./data/products_parquet") | |
sales_table = spark.read.parquet("./data/sales_parquet") | |
sellers_table = spark.read.parquet("./data/sellers_parquet") | |
# Define the UDF function | |
def algo(order_id, bill_text): | |
# If number is even | |
ret = bill_text.encode("utf-8") | |
if int(order_id) % 2 == 0: | |
# Count number of 'A' | |
cnt_A = bill_text.count("A") | |
for _c in range(0, cnt_A): | |
ret = hashlib.md5(ret).hexdigest().encode("utf-8") | |
ret = ret.decode('utf-8') | |
else: | |
ret = hashlib.sha256(ret).hexdigest() | |
return ret | |
# Register the UDF function. | |
algo_udf = spark.udf.register("algo", algo) | |
# Use the `algo_udf` to apply the aglorithm and then check if there is any duplicate hash in the table | |
sales_table.withColumn("hashed_bill", algo_udf(col("order_id"), col("bill_raw_text")))\ | |
.groupby(col("hashed_bill")).agg(count("*").alias("cnt")).where(col("cnt") > 1).show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment