Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
"""
Deduplicate organization names.
"""
import json
import logging
import pickle
import random
from io import StringIO
from itertools import combinations
import plac
from dedupe import Dedupe
from prodigy.util import split_evals
from textacy.fileio import read_json_lines
from pipeline.model import create_session, DB_URL, Entity
from pipeline.run_prodigy import normalize_examples
logger = logging.getLogger(__name__)
class Deduper(Dedupe):
"""Deduplicate database entries with dedupe.
The initial implementation was via the functions below. Refactor is WIP.
Notes
-----
We use Prodigy for fast labeling during active learning, rather than the CLI. The intended workflow is continuous
labeling of candidate pairs as distinct or duplicates, with periodic retraining of the dedupe model. (After every
batch, Prodigy calls the `update` method, retraining the model.)
"""
def update(self, answers):
"""Retrain the model with new data, at the end of each batch"""
logger.info('Updating! {} pairs'.format(len(answers)))
answers = prodigy_to_dedupe(answers)
self.markPairs(answers)
try:
self.train()
except TypeError as e:
logger.error('viewvalues generator emtpy? {}'.format(e))
def evaluate(self):
# Still using evaluate() below
pass
def reshape_for_dedupe(rows, fields):
"""Reshape database rows for Dedupe.
:param rows: A RowProxy, or a sequence of dictionaries that have an 'id' key and whatever fields are specified.
:param fields: Deduper fields dictionary (https://dedupe.io/developers/library/en/latest/Variable-definition.html).
:returns: Dictionary that can be consumed by a Deduper
"""
column_filter = [x['field'] for x in fields]
return {
row['id']: {k: v for k, v in row.items() if k in column_filter}
for row in rows
}
def database_to_dedupe(state, limit, url=DB_URL):
"""Read ORGs and registrants from database for Dedupe"""
session = create_session(url=url)
entities = session.query(Entity).filter_by(state=state)
if limit:
entities = entities.limit(limit)
candidates = [ent.__dict__ for ent in entities.all()]
# Get unique names for tfidf
corpus = [ent.name for ent in entities.distinct('name').all()]
return candidates, corpus
def dedupe_to_prodigy(pair):
"""Shape pairs from Dedupe for Prodigy.
"""
text = ''.join([str(text) for member in pair[0] for text in member.values()])
example = {
'_input_hash': hash(text),
'id': hash(text),
'options': [],
}
for i, option in enumerate(pair[0]):
option['id'] = i
example['options'].append(option)
return example
def stream_pairs(deduper):
"""Iterate over uncertain pairs from Dedupe.
"""
try:
while True:
pair = deduper.uncertainPairs()
yield dedupe_to_prodigy(pair)
except (ValueError, IndexError):
logger.info('Pairs exhausted')
def prodigy_to_dedupe(examples):
"""Shape answers from Prodigy for Dedupe.
A "choice"-view answer coming back from Prodigy looks like this:
{
"options": [
{"id": 1, "text": "Option 1"},
{"id": 2, "text": "Option 2"}
]
"answer": "accept"
}
"""
train = {'match': [], 'distinct': []}
for eg in examples:
if eg['answer'] == 'accept':
train['match'].append(eg['options'])
elif eg['answer'] == 'reject':
train['distinct'].append(eg['options'])
return train
def init_dedupe(state, dupe_path, train_path, limit, db_url=DB_URL):
if dupe_path:
# Read names from disk
logger.info('Loading dedupe candidates, corpus from disk')
with open(dupe_path, 'rb') as f:
candidates, corpus = pickle.load(f)
assert candidates
else:
# Read names from database
candidates, corpus = database_to_dedupe(state=state, limit=limit, db_url=db_url)
assert candidates
assert corpus
# Instantiate a deduper with organization names
fields = [
{'field': 'name', 'type': 'String'},
{'field': 'name', 'type': 'Text', 'corpus': corpus},
]
deduper = Deduper(fields)
# Prepare and load records to dedupe
dedupe_data = reshape_for_dedupe(candidates, fields)
deduper.sample(dedupe_data)
if train_path:
# Load existing training data
train = [x for x in read_json_lines(train_path)]
train = prodigy_to_dedupe(train)
for label in ['match', 'distinct']:
train[label] = [pair for pair in train[label] if 'text' in pair[0] and 'text' in pair[1]]
train_io = StringIO(json.dumps(train))
deduper.readTraining(train_io)
return deduper
def _reshape_predictions(clusters):
"""Wrangle predictions during model evaluation.
"""
predictions = set([])
for cluster in clusters:
if len(cluster[0]) > 1 and cluster[1][0] > .5:
for pair in combinations(cluster[0], 2):
predictions.add(frozenset(pair))
return predictions
def _score_clusters(found_dupes, true_dupes):
"""Score model performance.
"""
true_positives = found_dupes.intersection(true_dupes)
false_positives = found_dupes.difference(true_dupes)
uncovered_dupes = true_dupes.difference(found_dupes)
print('Found {} duplicates among {} true duplicates'.format(len(found_dupes), len(true_dupes)))
scores = {
'Precision': 1 - len(false_positives) / float(len(found_dupes)),
'Recall': len(true_positives) / float(len(true_dupes)),
}
scores['F_1'] = 2 * (scores['Precision'] * scores['Recall']) / (scores['Precision'] + scores['Recall'])
for k, v in scores.items():
print('{:<10} {:.2f}'.format(k, v))
return scores
@plac.annotations(
state=('Database state name'),
source=('Path to dedupe candidates, corpus', 'option'),
threshold=('Predicted match probabilities above this threshold are clustered', 'option'),
props=('A sequence of proportions of training data to use during evalutaion', 'option', None, list)
)
def evaluate(state='texas', source='dedupe-models/dedupe_export.jsonl', threshold=.7, props=[1]):
labeled_pairs = [line for line in read_json_lines(source)]
labeled_pairs = normalize_examples(labeled_pairs)
print('{} labeled pairs'.format(len(labeled_pairs)))
random.shuffle(labeled_pairs)
scores = []
for prop in props:
prop = 1
deduper = init_dedupe(state, dupe_path=None, train_path=None, limit=0)
train, test, _ = split_evals(labeled_pairs, .2)
print('{}-{} split'.format(len(test), len(train)))
train = train[:int(prop * len(train))]
train = prodigy_to_dedupe(train)
test = prodigy_to_dedupe(test)
deduper.markPairs(train)
deduper.train()
true_pairs = set([])
test_pairs = {}
for pair in test['match']:
hash0 = hash(pair[0]['text'])
hash1 = hash(pair[1]['text'])
test_pairs[hash0] = pair[0]
test_pairs[hash1] = pair[1]
true_pairs.add(frozenset((hash0, hash1)))
for pair in test['distinct']:
hash0 = hash(pair[0]['text'])
hash1 = hash(pair[1]['text'])
test_pairs[hash0] = pair[0]
test_pairs[hash1] = pair[1]
clusters = deduper.match(test_pairs, threshold=threshold)
predictions = _reshape_predictions(clusters)
scores.append(
{prop: _score_clusters(predictions, true_pairs)}
)
for s in scores:
for prop, v in s.items():
print('{0}: {Precision:.2f} {Recall:.2f} {F_1:.2f}'.format(prop, **v))
return scores
def select_candidates(state, limit=0, url=DB_URL):
session = create_session(url=url)
entities = session.query(Entity).filter_by(state=state)
if limit:
entities = entities.limit(limit)
entities = entities.subquery()
return candidates
if __name__ == '__main__':
evaluate('texas')
examples = read_json_lines('dedupe-models/tf.jsonl')
dedupe_tf = prodigy_to_dedupe(examples)
Path('dedupe-models/tf.json').write_text(json.dumps(dedupe_tf))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.