Skip to content

Instantly share code, notes, and snippets.

@harry-stark
Created January 24, 2023 06:30
Show Gist options
  • Save harry-stark/0a0315d9ac918f5db6b7ed2c9c62d972 to your computer and use it in GitHub Desktop.
Save harry-stark/0a0315d9ac918f5db6b7ed2c9c62d972 to your computer and use it in GitHub Desktop.
from spark_session_builder import build_spark_session
spark= build_spark_session(master="spark://cpu128-dy-r6i-32xlarge-3:7077",num_cores=128,mem_gb=999)
from pyspark.ml.feature import MinHashLSH,MinHashLSHModel
from pyspark.ml.linalg import Vectors
import time
from pyspark.sql.functions import col
from pyspark.ml.feature import MinHashLSH, Tokenizer, HashingTF
hash_size=100
threshold=0.8
start=time.time()
data = spark.read.parquet("/fsx/shared/pilev2_parquet/StackExchange_ver4/dataset.parquet").limit(1000)
print(data.count())
# mh=MinHashLSHModel(inputCol="text",outputCol="minhash",numHashTables=hash_size)
# model=mh.fit(data)
# data_transformed=model.transform(data)
# lsh=MinHashLSH(inputCol="minhash",outputCol="duplicates",threshold=threshold)
# lsh_model=lsh.fit(data_transformed)
# duplicates=lsh_model.approxSimilarityJoin(data_transformed,data_transformed,threshold)
#data = spark.read.parquet(*input_files)
# Tokenize text column and convert to vectors
tokenizer = Tokenizer(inputCol="text", outputCol="words")
wordsData = tokenizer.transform(data)
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=hash_size)
featurizedData = hashingTF.transform(wordsData)
# Convert vectors to MinHash
mh = MinHashLSH(inputCol="rawFeatures", outputCol="minhash")
model = mh.fit(featurizedData)
data_transformed = model.transform(featurizedData)
print(data_transformed.count())
print("hashed")
data_transformed.show()
raise Exception("stop")
# Perform LSH to find near duplicates
duplicates = model.approxSimilarityJoin(data_transformed, data_transformed, threshold)
# Write output to file
# duplicates.write.parquet(output_file)
print(duplicates.count())
end1=time.time()
duplicates.write.parquet("./results/outs.parquet")
end2=time.time()
print("Dedup time without writing",end1-start)
print("Dedup time with writing",end2-start)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment