Created
October 18, 2023 13:48
-
-
Save zucchini-nlp/4343de78df8410dea78447d91a81a03f to your computer and use it in GitHub Desktop.
Python script: Retrieve Wikipedia paragraphs, preprocess, and index with Xapian for efficient term-based searches.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import json | |
import pickle | |
import requests | |
import xapian | |
import nltk | |
# Overwrites the DB if exists. If you want to add to exsiting DB, use xapian.DB_CREATE_OR_OPEN | |
db = xapian.WritableDatabase("./toy_db", xapian.DB_CREATE_OR_OVERWRITE) | |
termgenerator = xapian.TermGenerator() | |
def clean_html(par): | |
par = re.sub(r'<span id.*?</span>', '', par) | |
par = re.sub(r'<i>(.*?)</i>', lambda m: m.group(1), par) | |
par = re.sub(r'(?:<ol>)?<li>(.*?)</li>(?:</ol>)?', lambda m: m.group(1), par) | |
par = par.replace("<p>", "").replace("</p>", "") | |
par = par.replace("\n", ' ').strip() | |
return par | |
def load_pars(titles, unnecessary_sections): | |
S = requests.Session() | |
url = "https://en.wikipedia.org/w/api.php" | |
section2par = {} | |
for title in titles: | |
params = { | |
"action": "query", | |
"prop": "extracts", | |
"titles": title, | |
"format": "json", | |
} | |
r = S.get(url=url, params=params) | |
data = r.json() | |
for key in data['query']['pages']: | |
print(f"Proccesing: {data['query']['pages'][key]['title']}") | |
page_content = data['query']['pages'][key]['extract'] | |
sections = re.findall(r'(?=(<h2>.*?<h2>))', page_content, flags=re.DOTALL) | |
for section in sections: | |
clean_pars = [] | |
section_title = re.findall(r'<h2>(.*?)</h2>', section, flags=re.DOTALL) | |
section_title = re.sub(r'<.*?>', '', section_title[0]) | |
if section_title.lower() not in unnecessary_sections: | |
pars = re.findall(r'(<p>.*?</p>\n?(?:<ol>.*?</ol>)?)', section, flags=re.DOTALL) | |
for par in pars: | |
par = clean_html(par) | |
clean_pars.append(par) | |
section2par[f"{title}, {section_title}"] = " ".join(clean_pars) | |
return section2par | |
titles = ["Search engine", "Technology", "Web query", "Internet", "Information"] | |
unnecessary_sections = ['see also', 'references', 'further reading'] | |
data = load_pars(titles, unnecessary_sections) | |
idx = 0 | |
for title in data: | |
page = data[title] | |
# index small chunks of text with max len of 100 tokens | |
for par in page.split("\n"): | |
curr_len = 0 | |
text = "" | |
par_sents = nltk.sent_tokenize(par) | |
for sent in par_sents: | |
text += f"{sent} " | |
curr_len += len(sent.split()) | |
# when we get 100 token text, index it to the DB | |
if curr_len >= 100: | |
doc = xapian.Document() | |
termgenerator.set_document(doc) | |
termgenerator.index_text(str(idx), 1, 'WIKI') | |
termgenerator.index_text(title, 1, 'TITLE') | |
termgenerator.index_text(text) | |
termgenerator.index_text(title) | |
fields = {"idx": str(idx), "title": title, "text": text} | |
doc.set_data(json.dumps(fields)) | |
db.add_document(doc) | |
idx += 1 | |
curr_len = 0 | |
text = "" | |
if idx % 100 == 0: | |
print(f"Added {idx} docs") | |
print("finished") | |
# Actual search | |
def load(db_path: str="./toy_db"): | |
db = xapian.WritableDatabase(db_path, xapian.DB_CREATE_OR_OPEN) | |
print("Doc count: ", db.get_doccount()) | |
queryparser = xapian.QueryParser() | |
queryparser.add_prefix("idx", "WIKI") | |
queryparser.add_prefix("title", "TITLE") | |
# search documents where all subqueries match. For default change to OP_OR | |
queryparser.set_default_op(xapian.Query.OP_AND) | |
return db, queryparser | |
def get_by_idx(idx, db, queryparser): | |
query = queryparser.parse_query(f'idx:"{idx}"') | |
enquire = xapian.Enquire(db) | |
enquire.set_query(query) | |
for match in enquire.get_mset(0, 1): | |
fields = json.loads(match.document.get_data()) | |
return fields | |
def search_by_title(title, db, queryparser, top_k: int=10): | |
query = queryparser.parse_query(f"title:{title}") | |
enquire = xapian.Enquire(db) | |
enquire.set_query(query) | |
res = [] | |
if top_k == "all": | |
top_k = len(enquire.get_mset()) | |
for match in enquire.get_mset(0, top_k): | |
fields = json.loads(match.document.get_data()) | |
res.append(fields) | |
return res | |
def search_fuzzy_match(term, db, queryparser, top_k: int=10): | |
query = queryparser.parse_query(term) | |
enquire = xapian.Enquire(db) | |
enquire.set_query(query) | |
res = [] | |
if top_k == "all": | |
top_k = len(enquire.get_mset) | |
for match in enquire.get_mset(0, top_k): | |
fields = json.loads(match.document.get_data()) | |
res.append(fields) | |
return res |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment