Skip to content

Instantly share code, notes, and snippets.

@pebbie
Created February 20, 2014 16:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pebbie/9117809 to your computer and use it in GitHub Desktop.
Save pebbie/9117809 to your computer and use it in GitHub Desktop.
"""
author: Peb Ruswono Aryan
date: 20.02.2014
Simple Entity Resolution
read a text file (news) and resolve entities mentioned in the text
uses:
- external Part-of-Speech Tagger API (REST)
- Entity knowledge base over SPARQL with Regex filter
"""
import json
import requests
import sys
def levenshtein(a,b):
"""Calculates the Levenshtein distance between a and b.
http://hetland.org/coding/python/levenshtein.py"""
n, m = len(a), len(b)
if n > m:
# Make sure n <= m, to use O(min(n,m)) space
a,b = b,a
n,m = m,n
current = range(n+1)
for i in range(1,m+1):
previous, current = current, [i]+[0]*n
for j in range(1,n+1):
add, delete = previous[j]+1, current[j-1]+1
change = previous[j-1]
if a[j-1] != b[i-1]:
change = change + 1
current[j] = min(add, delete, change)
return current[n]
def run_sparql(host, query, format="application/json", filename=""):
params={
"query": query,
"debug": "on",
"timeout": "",
"format": format,
}
r = requests.post(host, params=params)
if r.status_code==requests.codes.ok:
return r.json()
else:
raise Exception("SPARQL Error")
def remote_tag(url, txt):
params={
"teks": txt,
"task": "postag"
}
r = requests.post(url, data=params)
if r.status_code==requests.codes.ok:
return r.text.split("\n")
else:
print r.status_code
return r.content
if __name__ == "__main__":
NLP = "http://nlp.pebbie.net"
ENDPOINT = "http://id.dbpedia.org/sparql"
if len(sys.argv)>1:
#read from file
with open(sys.argv[1]) as f: txt = f.read()
#use web service to do pos tagging
sent = remote_tag(NLP+'/handler', txt)
#extract nouns & proper nouns
c_entities = []
buffer = []
stat = "non_ent"
for s in sent:
tmp = [tuple(term.split("/")) for term in s.split(" ")]
for lex, tag in tmp:
if stat == "non_ent":
if tag in ["NN", "NNP"]:
buffer.append(lex)
stat = "ent"
elif stat == "ent":
if tag in ["NN", "NNP"]:
buffer.append(lex)
else:
chunk = " ".join(buffer)
if chunk not in c_entities:
c_entities.append(chunk)
buffer = []
stat = "non_ent"
first_capital = lambda s: len(s)>1 and s[0]==s[0].upper()
#first filtering based on capitalized first character
cap_filter = True
if cap_filter:
output = []
for ent in c_entities:
if any([first_capital(w) for w in ent.split()]):
output.append(ent)
c_entities = output
#print c_entities
#print
#second filtering based on subword split ["A B C D", "C", "C D"] -> ["A B", "C D", "C"]
sub_filter = True
if sub_filter:
output = []
is_sub = {}
for ent in c_entities:
sub = [e for e in c_entities if e != ent and ent in e]
if len(sub)==0 and ent not in output:
output.append(ent)
else:
is_sub[ent] = sub
for ent, sub in is_sub.items():
for esub in sub:
if esub in output:
output.remove(esub)
epos = esub.index(ent)
first = esub[:epos]
rest = esub[epos:]
if first not in output:
if first_capital(first) or not cap_filter:
output.append(first)
if rest not in output:
if first_capital(rest) or not cap_filter:
output.append(rest)
c_entities = output
#print c_entities
#resolve to knowledge base via SPARQL (e.g. dbpedia)
output = {}
for entity in c_entities:
try:
result = run_sparql(ENDPOINT, """Select distinct ?ent, ?lbl where { ?ent rdfs:label ?lbl. filter(regex(?lbl, "%s", "i"))} LIMIT 10""" % entity)
candidate = []
print "resolving \"%s\"..." % entity
#print result["results"]["bindings"]
for cand in result["results"]["bindings"]:
cand_pair = (cand["ent"]["value"], cand["lbl"]["value"])
if cand_pair not in candidate: candidate.append(cand_pair)
if len(candidate)>0:
output[entity] = candidate
finally:
pass
c_entities = output
#choose best match using string matching (e.g. levenshtein)
output = {}
for entity, candidates in c_entities.items():
ename = entity.lower().replace("_","").replace(" ","")
#prepare candidates
tmp = {}
for iri, label in candidates:
kname = label.lower().replace("_","").replace(" ","")
if kname not in tmp:
tmp[kname] = [iri]
else:
tmp[kname].append(iri)
if len(tmp.keys())==1:
#accept if there's only one alternative
output[entity] = tmp.values()[0]
else:
#sort ascending by distance
sorted_candidates = sorted([(kname, levenshtein(kname, ename)) for kname in tmp.keys()], key=lambda x:x[1])
#accept if computed distance is less than original string length
if sorted_candidates[0][1] <= len(entity):
output[entity] = tmp[sorted_candidates[0][0]]
c_entities = output
print json.dumps(c_entities, indent=2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment