Skip to content

Instantly share code, notes, and snippets.

@rom1504
Created February 19, 2023 21:47
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rom1504/59a49533b6ae96cbbe4fe047c752a36b to your computer and use it in GitHub Desktop.
Save rom1504/59a49533b6ae96cbbe4fe047c752a36b to your computer and use it in GitHub Desktop.
bucket_dedup.py
"""
This is a deduplication method using pyspark.
input: table with id and 2 columns that contain float values
2 items are considered the same if the float values are equal with a threshold of 0.05
algo: multiply the 2 columns by 1/0.05, resulting in 2 longs. Then use pyspark to perform exact dedup using these 2 columns
Pyspark does distributed sort then linear dedup, so this scales to 100B
"""
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.config("spark.driver.memory", "16G") .config('spark.sql.shuffle.partitions',300).config("spark.local.dir", "/tmp/spark-tmp2").master("local[64]").appName('spark-stats').getOrCreate()
p = "/fsx/mlrichter/dedublication/3billion/chunkall_to_unified_results.csv"
df = spark.read.csv(p, header=True)
df = df.withColumn("perpl_ontocord/riverbed_kenlm",df["perpl_ontocord/riverbed_kenlm"].cast('float'))
df = df.withColumn("perpl_ccnet/wikipedia", df["perpl_ccnet/wikipedia"].cast('float'))
df = df.withColumn("b1", F.round(F.col("perpl_ontocord/riverbed_kenlm") / 0.05).cast("long"))
df = df.withColumn("b2", F.round(F.col("perpl_ccnet/wikipedia") / 0.05).cast("long"))
df = df.drop_duplicates(["b1", "b2"])
df.repartition(100).write.mode("overwrite").parquet("output_parquet2")
df = spark.read.parquet("output_parquet2")
df.count()
"""
It is also possible to convert the 2 longs to bytes then concat the byte and use that as a hash, same idea:
def perplexity_to_bytes(perplexity, threshold=0.05):
bucket = round(perplexity / threshold).to_bytes(5, 'big')
return bucket
from pyspark.sql.types import BinaryType
conv = F.udf(perplexity_to_bytes, BinaryType())
df = df.withColumn("hash", F.concat(conv(F.col("perpl_ontocord/riverbed_kenlm")), conv(F.col("perpl_ccnet/wikipedia"))))
df = df.drop_duplicates(["hash"])
"""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment