Created
May 18, 2018 01:26
-
-
Save jamesdunham/c8055c232673895038b49dd13b09fcbe to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Deduplicate organization names. | |
""" | |
import json | |
import logging | |
import pickle | |
import random | |
from io import StringIO | |
from itertools import combinations | |
import plac | |
from dedupe import Dedupe | |
from prodigy.util import split_evals | |
from textacy.fileio import read_json_lines | |
from pipeline.model import create_session, DB_URL, Entity | |
from pipeline.run_prodigy import normalize_examples | |
logger = logging.getLogger(__name__) | |
class Deduper(Dedupe): | |
"""Deduplicate database entries with dedupe. | |
The initial implementation was via the functions below. Refactor is WIP. | |
Notes | |
----- | |
We use Prodigy for fast labeling during active learning, rather than the CLI. The intended workflow is continuous | |
labeling of candidate pairs as distinct or duplicates, with periodic retraining of the dedupe model. (After every | |
batch, Prodigy calls the `update` method, retraining the model.) | |
""" | |
def update(self, answers): | |
"""Retrain the model with new data, at the end of each batch""" | |
logger.info('Updating! {} pairs'.format(len(answers))) | |
answers = prodigy_to_dedupe(answers) | |
self.markPairs(answers) | |
try: | |
self.train() | |
except TypeError as e: | |
logger.error('viewvalues generator emtpy? {}'.format(e)) | |
def evaluate(self): | |
# Still using evaluate() below | |
pass | |
def reshape_for_dedupe(rows, fields): | |
"""Reshape database rows for Dedupe. | |
:param rows: A RowProxy, or a sequence of dictionaries that have an 'id' key and whatever fields are specified. | |
:param fields: Deduper fields dictionary (https://dedupe.io/developers/library/en/latest/Variable-definition.html). | |
:returns: Dictionary that can be consumed by a Deduper | |
""" | |
column_filter = [x['field'] for x in fields] | |
return { | |
row['id']: {k: v for k, v in row.items() if k in column_filter} | |
for row in rows | |
} | |
def database_to_dedupe(state, limit, url=DB_URL): | |
"""Read ORGs and registrants from database for Dedupe""" | |
session = create_session(url=url) | |
entities = session.query(Entity).filter_by(state=state) | |
if limit: | |
entities = entities.limit(limit) | |
candidates = [ent.__dict__ for ent in entities.all()] | |
# Get unique names for tfidf | |
corpus = [ent.name for ent in entities.distinct('name').all()] | |
return candidates, corpus | |
def dedupe_to_prodigy(pair): | |
"""Shape pairs from Dedupe for Prodigy. | |
""" | |
text = ''.join([str(text) for member in pair[0] for text in member.values()]) | |
example = { | |
'_input_hash': hash(text), | |
'id': hash(text), | |
'options': [], | |
} | |
for i, option in enumerate(pair[0]): | |
option['id'] = i | |
example['options'].append(option) | |
return example | |
def stream_pairs(deduper): | |
"""Iterate over uncertain pairs from Dedupe. | |
""" | |
try: | |
while True: | |
pair = deduper.uncertainPairs() | |
yield dedupe_to_prodigy(pair) | |
except (ValueError, IndexError): | |
logger.info('Pairs exhausted') | |
def prodigy_to_dedupe(examples): | |
"""Shape answers from Prodigy for Dedupe. | |
A "choice"-view answer coming back from Prodigy looks like this: | |
{ | |
"options": [ | |
{"id": 1, "text": "Option 1"}, | |
{"id": 2, "text": "Option 2"} | |
] | |
"answer": "accept" | |
} | |
""" | |
train = {'match': [], 'distinct': []} | |
for eg in examples: | |
if eg['answer'] == 'accept': | |
train['match'].append(eg['options']) | |
elif eg['answer'] == 'reject': | |
train['distinct'].append(eg['options']) | |
return train | |
def init_dedupe(state, dupe_path, train_path, limit, db_url=DB_URL): | |
if dupe_path: | |
# Read names from disk | |
logger.info('Loading dedupe candidates, corpus from disk') | |
with open(dupe_path, 'rb') as f: | |
candidates, corpus = pickle.load(f) | |
assert candidates | |
else: | |
# Read names from database | |
candidates, corpus = database_to_dedupe(state=state, limit=limit, db_url=db_url) | |
assert candidates | |
assert corpus | |
# Instantiate a deduper with organization names | |
fields = [ | |
{'field': 'name', 'type': 'String'}, | |
{'field': 'name', 'type': 'Text', 'corpus': corpus}, | |
] | |
deduper = Deduper(fields) | |
# Prepare and load records to dedupe | |
dedupe_data = reshape_for_dedupe(candidates, fields) | |
deduper.sample(dedupe_data) | |
if train_path: | |
# Load existing training data | |
train = [x for x in read_json_lines(train_path)] | |
train = prodigy_to_dedupe(train) | |
for label in ['match', 'distinct']: | |
train[label] = [pair for pair in train[label] if 'text' in pair[0] and 'text' in pair[1]] | |
train_io = StringIO(json.dumps(train)) | |
deduper.readTraining(train_io) | |
return deduper | |
def _reshape_predictions(clusters): | |
"""Wrangle predictions during model evaluation. | |
""" | |
predictions = set([]) | |
for cluster in clusters: | |
if len(cluster[0]) > 1 and cluster[1][0] > .5: | |
for pair in combinations(cluster[0], 2): | |
predictions.add(frozenset(pair)) | |
return predictions | |
def _score_clusters(found_dupes, true_dupes): | |
"""Score model performance. | |
""" | |
true_positives = found_dupes.intersection(true_dupes) | |
false_positives = found_dupes.difference(true_dupes) | |
uncovered_dupes = true_dupes.difference(found_dupes) | |
print('Found {} duplicates among {} true duplicates'.format(len(found_dupes), len(true_dupes))) | |
scores = { | |
'Precision': 1 - len(false_positives) / float(len(found_dupes)), | |
'Recall': len(true_positives) / float(len(true_dupes)), | |
} | |
scores['F_1'] = 2 * (scores['Precision'] * scores['Recall']) / (scores['Precision'] + scores['Recall']) | |
for k, v in scores.items(): | |
print('{:<10} {:.2f}'.format(k, v)) | |
return scores | |
@plac.annotations( | |
state=('Database state name'), | |
source=('Path to dedupe candidates, corpus', 'option'), | |
threshold=('Predicted match probabilities above this threshold are clustered', 'option'), | |
props=('A sequence of proportions of training data to use during evalutaion', 'option', None, list) | |
) | |
def evaluate(state='texas', source='dedupe-models/dedupe_export.jsonl', threshold=.7, props=[1]): | |
labeled_pairs = [line for line in read_json_lines(source)] | |
labeled_pairs = normalize_examples(labeled_pairs) | |
print('{} labeled pairs'.format(len(labeled_pairs))) | |
random.shuffle(labeled_pairs) | |
scores = [] | |
for prop in props: | |
prop = 1 | |
deduper = init_dedupe(state, dupe_path=None, train_path=None, limit=0) | |
train, test, _ = split_evals(labeled_pairs, .2) | |
print('{}-{} split'.format(len(test), len(train))) | |
train = train[:int(prop * len(train))] | |
train = prodigy_to_dedupe(train) | |
test = prodigy_to_dedupe(test) | |
deduper.markPairs(train) | |
deduper.train() | |
true_pairs = set([]) | |
test_pairs = {} | |
for pair in test['match']: | |
hash0 = hash(pair[0]['text']) | |
hash1 = hash(pair[1]['text']) | |
test_pairs[hash0] = pair[0] | |
test_pairs[hash1] = pair[1] | |
true_pairs.add(frozenset((hash0, hash1))) | |
for pair in test['distinct']: | |
hash0 = hash(pair[0]['text']) | |
hash1 = hash(pair[1]['text']) | |
test_pairs[hash0] = pair[0] | |
test_pairs[hash1] = pair[1] | |
clusters = deduper.match(test_pairs, threshold=threshold) | |
predictions = _reshape_predictions(clusters) | |
scores.append( | |
{prop: _score_clusters(predictions, true_pairs)} | |
) | |
for s in scores: | |
for prop, v in s.items(): | |
print('{0}: {Precision:.2f} {Recall:.2f} {F_1:.2f}'.format(prop, **v)) | |
return scores | |
def select_candidates(state, limit=0, url=DB_URL): | |
session = create_session(url=url) | |
entities = session.query(Entity).filter_by(state=state) | |
if limit: | |
entities = entities.limit(limit) | |
entities = entities.subquery() | |
return candidates | |
if __name__ == '__main__': | |
evaluate('texas') | |
examples = read_json_lines('dedupe-models/tf.jsonl') | |
dedupe_tf = prodigy_to_dedupe(examples) | |
Path('dedupe-models/tf.json').write_text(json.dumps(dedupe_tf)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment