Skip to content

Instantly share code, notes, and snippets.

@yifeihuang
Last active March 1, 2021 10:26
Show Gist options
  • Save yifeihuang/5d58f713c75f60f978848f8a43c1fc5d to your computer and use it in GitHub Desktop.
Save yifeihuang/5d58f713c75f60f978848f8a43c1fc5d to your computer and use it in GitHub Desktop.
Candidate pair generation and initial match scoring
from pyspark.sql import functions as f
from pyspark.sql import types as t
from pyspark.sql import Window as w
import numpy as np
from graphframes import GraphFrame
keep_cols = ['source', 'name', 'description', 'manufacturer', 'price',
'name_swRemoved', 'description_swRemoved', 'manufacturer_swRemoved',
'name_swRemoved_tfidf', 'description_swRemoved_tfidf', 'manufacturer_swRemoved_tfidf',
'name_encoding', 'description_encoding']
LARGEST_BLOCK = 100
node = blocking_df.select(f.col('uid').alias('id'), *keep_cols)
keep_pairs = blocking_df.select(f.explode('blocking_keys').alias('blocking_key'), 'uid')\
.groupBy('blocking_key')\
.agg(
f.count('uid').alias('block_size'),
f.collect_set('uid').alias('uid'),
)\
.filter(f.col('block_size').between(2,LARGEST_BLOCK))\
.select('blocking_key', f.explode('uid').alias('uid'))
left = keep_pairs.withColumnRenamed('uid', 'src')
right = keep_pairs.withColumnRenamed('uid', 'dst')
candidate_pairs = left.join(right, ['blocking_key'], 'inner')\
.filter(f.col('src') < f.col('dst'))\
.select('src', 'dst')\
.distinct()
g = GraphFrame(node, candidate_pairs)
@diegoquintanav
Copy link

what is 'blocking_keys' in this example?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment