Created
August 27, 2021 16:43
-
-
Save duhaime/aa132323aca8aa50587331267def162d to your computer and use it in GitHub Desktop.
Download WikiPedia People Data
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from SPARQLWrapper import SPARQLWrapper, JSON | |
from collections import defaultdict | |
from bs4 import BeautifulSoup | |
import numpy as np | |
import json, os, glob, subprocess, string | |
import nltk, codecs, re, shutil, sys | |
################################################## | |
# Wrapper to find nested location of stored data # | |
################################################## | |
def get_nested_path(filename): | |
"""Read in a filename (e.g. 0428271.json) and return the | |
series of 3 subdirectories in which that file is stored | |
plus the filename itself (e.g. 0/4/2/0428271.json)""" | |
nested_dirs = "/".join(filename[i] for i in [0, 1, 2]) | |
return "/" + nested_dirs + "/" + filename | |
################################################# | |
# Collect Wikipedia name and ids for all people # | |
################################################# | |
def send_sparql_query(page_number, results_per_page): | |
"""Return dbpedia json on all people in dbpedia. Note: | |
dbpedia returns a maximum of 10000 responses per query""" | |
print("requesting page", page_number) | |
# specify the name of the file where the data will be stored | |
outfile_name = 'all_people_page_' + str(page_number) + '.json' | |
# return the data from disk if possible | |
try: | |
sparql_json_file = out_dir + sub_dirs[0] + "/" + outfile_name | |
with open(sparql_json_file) as sparql_json_in: | |
sparql_json = json.load(sparql_json_in) | |
return sparql_json | |
except Exception as exc: | |
print("local copies of", sparql_json_file, "not found, so requesting data") | |
max_retries = 10 | |
retries = 0 | |
while retries < max_retries: | |
try: | |
query = """select distinct ?person ?wikipediaPageid { | |
?person a dbo:Person ; | |
dbo:wikiPageID ?wikipediaPageid | |
} LIMIT """ + str(results_per_page) + \ | |
" OFFSET " + str(page_number * results_per_page) | |
# submit the query and return a json response | |
sparql = SPARQLWrapper("http://dbpedia.org/sparql") | |
sparql.setReturnFormat(JSON) | |
sparql.setQuery(query) | |
# conver the response to json | |
sparql_json = sparql.query().convert() | |
# write the response to disk | |
write_json(out_dir + sub_dirs[0] + "/" + outfile_name, sparql_json) | |
return sparql_json | |
except Exception as exc: | |
print('could not retrieve sparql json due to error:', exc) | |
time.sleep(5) | |
retries += 1 | |
# worst case scenaria | |
return None | |
def make_dirs(out_dir, sub_dirs): | |
"""Read in an out directory and a list of subdirectories, | |
and write them all to disk""" | |
if not os.path.exists(out_dir): | |
os.makedirs(out_dir) | |
for sub_dir in sub_dirs: | |
dir_path = out_dir + sub_dir | |
for i in range(10): | |
for j in range(10): | |
for k in range(10): | |
outpath = dir_path + '/' + '/'.join([str(l) for l in [i,j,k]]) | |
if not os.path.exists(outpath): | |
# avoid race condition across workers | |
try: | |
os.makedirs(outpath) | |
except: | |
pass | |
def write_json(file_name, json_object): | |
"""Read in a filename and a json packet, and write the latter | |
to the former""" | |
with open(file_name, "w") as json_out: | |
json.dump(json_object, json_out) | |
def get_page(page_number, results_per_page): | |
"""Function to send sparql query and write results to disk""" | |
sparql_json = send_sparql_query(page_number, results_per_page) | |
if sparql_json and len(sparql_json["results"]["bindings"]) == results_per_page: | |
return page_number + 1 | |
else: | |
return 0 | |
def get_json_on_all_wiki_people(start_page=0, end_page=False): | |
"""Request json on all people in dbpedia and | |
write json on each to disk. If the user uses start and | |
or end pages, start and end on those pages""" | |
# get the first page of results, if there are more | |
# to get thereafter, carry on until finished | |
next_page = get_page(0 + start_page, results_per_page) | |
while next_page > 0: | |
if next_page > end_page: | |
next_page = 0 | |
break | |
next_page = get_page(next_page, results_per_page) | |
############################################### | |
# Collect DBPedia metadata on all wiki people # | |
############################################### | |
def parse_ids_from_json_page(page): | |
"""Read in a page of json responses with wiki ids | |
and names, and return a clean dictionary mapping of | |
id to name""" | |
d = {} | |
with open(page) as f: | |
j = json.load(f) | |
for person in j["results"]["bindings"]: | |
wikipedia_id = person["wikipediaPageid"]["value"] | |
wikipedia_name = person["person"]["value"] | |
d[wikipedia_id] = wikipedia_name.replace("http://dbpedia.org/resource/","") | |
return d | |
def get_dbpedia_metadata(page_name, wikipedia_page_id): | |
"""Read in the case-sensitive name of a wikipedia page and | |
return structured metadata on that page from dbpedia""" | |
dbpedia_path = out_dir + sub_dirs[5] + "/" + get_nested_path(wikipedia_page_id) + "_dbpedia.json" | |
try: | |
with open(dbpedia_path) as dbpedia_in: | |
response = json.load(dbpedia_in) | |
return response | |
except: | |
pass | |
query = """ | |
SELECT ?property ?hasValue ?isValueOf | |
WHERE { | |
{ <http://dbpedia.org/resource/""" + page_name + """> ?property ?hasValue } | |
UNION | |
{ ?isValueOf ?property <http://dbpedia.org/resource/""" + page_name + """> } | |
}""" | |
# submit the query and return a json response | |
sparql = SPARQLWrapper("http://dbpedia.org/sparql") | |
sparql.setReturnFormat(JSON) | |
sparql.setQuery(query) | |
# parse the response | |
response = sparql.query().convert() | |
parsed_response = parse_dbpedia_metadata(response) | |
# write the data to disk | |
if parsed_response: | |
write_json(dbpedia_path, parsed_response) | |
return parsed_response | |
def parse_dbpedia_metadata(sparql_response): | |
"""Read in structured json from dbpedia, and return json | |
that details fields of interest within the metadata""" | |
parsed_metadata = {} | |
# map sparql fields of interest to a name for the field | |
fields_of_interest = { | |
"http://dbpedia.org/ontology/thumbnail": "thumbnail", | |
"http://dbpedia.org/ontology/birthDate": "birth_date", | |
"http://dbpedia.org/ontology/deathDate": "death_data", | |
"http://xmlns.com/foaf/0.1/name": "name", | |
"http://dbpedia.org/ontology/abstract": "abstract" | |
} | |
for field in sparql_response["results"]["bindings"]: | |
for field_of_interest in fields_of_interest.iterkeys(): | |
if field["property"]["value"] == field_of_interest: | |
label = fields_of_interest[field_of_interest] | |
# only retain english abstracts | |
if label == "abstract": | |
if field["hasValue"]["xml:lang"] != "en": | |
continue | |
parsed_metadata[label] = field["hasValue"]["value"] | |
return parsed_metadata | |
########################### | |
# Text processing helpers # | |
########################### | |
def get_sentences(raw_html): | |
"""Read in some html and return plaintext sentences""" | |
paragraphs = [] | |
soup = BeautifulSoup(raw_html, "html5lib") | |
for node in soup.findAll('p'): | |
paragraphs.append( "".join(node.findAll(text=True)) ) | |
text = " ".join(paragraphs) | |
return sentence_tokenizer.tokenize(text) | |
def write_plaintext(wikipedia_id, sentence_array, plaintext_path): | |
"""Read in a wikipedia id and a list of sentences, | |
and write the latter to disk""" | |
with codecs.open(plaintext_path, "w", "utf-8") as out: | |
out.write(" ".join(sentence_array)) | |
def clean_text(s): | |
"""Read in a string and return the string in a format | |
suitable for display in app""" | |
# remove content between round braces, then square braces | |
s = re.sub(r'\([^)]*\)', '', s) | |
s = re.sub(r'\[(.+?)\]', '', s) | |
# ad-hoc required cleanups | |
s = s.replace(",,", ",") | |
return s | |
def create_vsm(path_to_glove_file, sep=" "): | |
"""Read in a GloVe file, split it on the sep character | |
into dims units, and return a mapping from string to | |
the dims dimensional vector position of the given word | |
in a vector space model""" | |
return None | |
vsm = defaultdict(list) | |
with codecs.open(path_to_glove_file, "r", "utf-8") as f: | |
f = f.read().lower().split("\n") | |
for r in f: | |
try: | |
sr = r.split(sep) | |
word = sr[0] | |
word_vector = [float(i) for i in sr[1:]] | |
if len(word_vector) != dims: | |
continue | |
vsm[word] = word_vector | |
except ValueError: | |
print(r.split(sep)[0]) | |
return vsm | |
def clean_word(s): | |
"""Read in a word and return that word in clean form""" | |
return ''.join(c for c in s if c not in punctuation) | |
def get_doc_vector(s): | |
"""Read in a string and return a doc vector for that string""" | |
return None | |
word_vector_list = [] | |
words = [clean_word(w) for w in s.lower().split() if w not in stop_words] | |
word_count = 0 | |
for w in words: | |
try: | |
word_vector = vsm[w] | |
if len(word_vector) == dims: | |
word_vector_list.append(word_vector) | |
word_count += 1 | |
except KeyError: | |
continue | |
# take the sum for each column | |
column_sums = np.sum(word_vector_list, axis=0) | |
# generate a length normalized vector with limited float precision | |
normalized = ["{0:.4f}".format(c/word_count) for c in column_sums] | |
return normalized | |
############################## | |
# Metadata parsing utilities # | |
############################## | |
def parse_wiki_metadata(wikipedia_json): | |
"""Read in a json response from wikipedia and return | |
plaintext that contains the text content in that json""" | |
# parse out clean plaintext | |
punct = [".", ",", ";"] | |
extract_text = '' | |
try: | |
extract = wikipedia_json["extract"] | |
extract_words = clean_text(extract).split("=")[0].split()[:120] | |
except KeyError: | |
return 0 | |
for c, w in enumerate(extract_words): | |
if c == 0: | |
extract_text += w | |
elif w in punct: | |
extract_text += w | |
else: | |
extract_text += " " + w | |
# try to grab the image url | |
try: | |
thumbnail_url = wikipedia_json["thumbnail"]["source"] | |
except KeyError: | |
thumbnail_url = "" | |
response = { | |
"bio": extract_text, | |
"thumbnail": thumbnail_url, | |
"doc_vector": get_doc_vector(extract) | |
} | |
return response | |
def get_image_and_text(wikipedia_id): | |
"""Read in a wikipedia id and return plaintext content | |
suitable for displaying in card json""" | |
# if the data already exists on disk, return it | |
image_and_text_path = out_dir + sub_dirs[3] + "/" + get_nested_path(wikipedia_id) + "_image_and_text.json" | |
try: | |
with open(image_and_text_path) as f: | |
return json.load(f) | |
except: | |
pass | |
# else query for the data and write it to disk | |
query = 'https://en.wikipedia.org/w/api.php?action=query' | |
query += '&redirects=1&prop=pageimages|info|extracts' | |
query += '&inprop=url&format=json&pithumbsize=300&explaintext' | |
query += '&pageids=' + wikipedia_id | |
request = "curl '" + query + "'" | |
response = subprocess.check_output([request], shell=True) | |
response_json = json.loads(response) | |
try: | |
card_json = response_json["query"]["pages"][str(wikipedia_id)] | |
curated_metadata = parse_wiki_metadata(card_json) | |
# write the image and text to disk | |
if curated_metadata: | |
write_json(image_and_text_path, curated_metadata) | |
return curated_metadata | |
except KeyError: | |
return 0 | |
def get_wikipedia_metadata(wikipedia_name, wikipedia_id, max_sentences=20): | |
"""Read in a wikipedia page name and fetch text from | |
that page via a wget request""" | |
# if the plaintext already exists on disk, use it, else fetch it | |
plaintext_path = out_dir + sub_dirs[1] + "/" + get_nested_path(wikipedia_id) + "_plaintext.txt" | |
if not os.path.isfile(plaintext_path): | |
try: | |
request = "curl 'https://en.wikipedia.org/wiki/" + wikipedia_name + "'" | |
raw_html = subprocess.check_output([request], shell=True) | |
sentences = get_sentences(raw_html) | |
write_plaintext(wikipedia_id, sentences[:max_sentences], plaintext_path) | |
except subprocess.CalledProcessError: | |
return 0 | |
# fetch json parsed by wiki that's fit for displaying on the client | |
curated_metadata = get_image_and_text(wikipedia_id) | |
return curated_metadata | |
def get_page_view_stats(wikipedia_page_name, wikipedia_page_id): | |
"""Read in a wikipedia page name and return the aggregate | |
number of times that page was accessed in October of 2015""" | |
# if the pageview json exists on disk, use it | |
page_views_path = out_dir + sub_dirs[4] + "/" + get_nested_path(wikipedia_page_id) + "_page_views.json" | |
try: | |
with open(page_views_path) as page_views_in: | |
return json.load(page_views_in) | |
except: | |
pass | |
query = "https://wikimedia.org/api/rest_v1/metrics/pageviews/" | |
query += "per-article/en.wikipedia/all-access/all-agents/" | |
query += wikipedia_page_name + "/daily/2015100100/2015103100" | |
try: | |
request = 'curl "' + query + '"' | |
response = subprocess.check_output([request], shell=True) | |
json_response = json.loads(response) | |
except subprocess.CalledProcessError: | |
return 0 | |
try: | |
page_views = sum([item["views"] for item in json_response["items"]]) | |
# write the page views data to disk | |
if page_views: | |
write_json(page_views_path, page_views) | |
return page_views | |
except KeyError: | |
return 0 | |
############################# | |
# Generate Card json output # | |
############################# | |
def get_thumbnail_image(wikipedia_page_id, thumbnail_url): | |
"""Read in a wikipedia page id and a url to that page's | |
thumbnail, and fetch the thumbnail""" | |
# copy the card thumbnail to the images directory if it's not there | |
image_out_path = out_dir + sub_dirs[6] | |
image_out_path += get_nested_path(wikipedia_page_id) + ".jpg" | |
if not os.path.isfile(image_out_path): | |
try: | |
subprocess.check_output(["wget '" + thumbnail_url + "' -O " + image_out_path], shell=True) | |
except subprocess.CalledProcessError: | |
raise Exception('thumbnails are required') | |
# validate the image file contains content | |
if os.path.getsize(image_out_path) < 100: | |
raise Exception('thumbnails are required') | |
else: | |
return 0 | |
def get_thumbnail(wiki_parsed, dbpedia_parsed): | |
"""Read in parsed wiki and dbpedia json, and return the thumbnail | |
for the current page""" | |
if wiki_parsed["thumbnail"]: | |
return wiki_parsed["thumbnail"] | |
elif "thumbnail" in dbpedia_parsed.iterkeys(): | |
return dbpedia_parsed["thumbnail"] | |
else: | |
raise Exception('thumbnails are required') | |
def get_plaintext(wikipedia_page_id): | |
"""Read in a wikipedia page id and return that page's plaintext""" | |
plaintext_path = out_dir + sub_dirs[1] + "/" + get_nested_path(wikipedia_page_id) + "_plaintext.txt" | |
with codecs.open(plaintext_path, "r", "utf-8") as plaintext_in: | |
return plaintext_in.read() | |
def get_bio(wiki_parsed, dbpedia_parsed): | |
"""Read in parsed wiki and dbpedia metadata and return the | |
bio for the current card""" | |
if wiki_parsed["bio"]: | |
return wiki_parsed["bio"] | |
elif "abstract" in dbpedia_parsed.iterkeys(): | |
return dbpedia_parsed["abstract"] | |
else: | |
raise Exception('bios are required') | |
def write_card_json(dbpedia_parsed, wiki_parsed): | |
"""Read in structured dbpedia metadata and a plaintext abstract | |
for the current record, and write the current card json to disk""" | |
wikipedia_page_id = wiki_parsed["wikipedia_page_id"] | |
wikipedia_page_name = wiki_parsed["wikipedia_page_name"] | |
# retrieve a thumbnail and bio or don't write the card | |
thumbnail_url = get_thumbnail(wiki_parsed, dbpedia_parsed) | |
thumbnail_image = get_thumbnail_image(wikipedia_page_id, thumbnail_url) | |
bio = get_bio(wiki_parsed, dbpedia_parsed) | |
# clean the agent's name | |
name = clean_text( " ".join(wikipedia_page_name.split("_") ) ) | |
# try to retrieve the individuals dates | |
birth_date = dbpedia_parsed["birth_date"] if "birth_date" in dbpedia_parsed.iterkeys() else '' | |
death_date = dbpedia_parsed["death_date"] if "death_date" in dbpedia_parsed.iterkeys() else '' | |
card_json = { | |
"bio": wiki_parsed["bio"], | |
"birth_date": birth_date, | |
"death_date": death_date, | |
"name": name, | |
"thumbnail": "./assets/images/" + wikipedia_page_id + ".jpg", | |
"doc_vector": [float(v) for v in wiki_parsed["doc_vector"]], | |
"page_views": wiki_parsed["page_views"], | |
"plaintext": get_plaintext(wikipedia_page_id), | |
"wikipedia_page_id": wikipedia_page_id, | |
"wikipedia_page_name": wikipedia_page_name | |
} | |
out_file = wiki_parsed["wikipedia_page_id"] + "_card.json" | |
sub_dir_0 = wiki_parsed["wikipedia_page_id"][0] | |
sub_dir_1 = wiki_parsed["wikipedia_page_id"][1] | |
sub_dir_2 = wiki_parsed["wikipedia_page_id"][2] | |
out_dir_path = card_json_dir + "/" + "/".join([sub_dir_0, sub_dir_1, sub_dir_2]) | |
if not os.path.exists(out_dir_path): | |
os.makedirs(out_dir_path) | |
write_json(out_dir_path + "/" + out_file, card_json) | |
def collect_metadata(wikipedia_id, wikipedia_name): | |
"""Read in the id and name of a person in wikipedia, acquire | |
metadata on that individual, and write it to disk""" | |
# request all data if it doesn't exist on disk | |
dbpedia_parsed = get_dbpedia_metadata(wikipedia_name, wikipedia_id) | |
wiki_parsed = get_wikipedia_metadata(wikipedia_name, wikipedia_id) | |
page_views = get_page_view_stats(wikipedia_name, wikipedia_id) | |
if not wiki_parsed: | |
raise Exception('bios are required') | |
if not page_views: | |
raise Exception('page views are required') | |
# compile and write the metadata to disk | |
wiki_parsed["wikipedia_page_name"] = wikipedia_name | |
wiki_parsed["wikipedia_page_id"] = wikipedia_id | |
wiki_parsed["page_views"] = page_views | |
write_card_json(dbpedia_parsed, wiki_parsed) | |
def get_metadata_on_all_wiki_people(): | |
"""Iterate over the pages of people json, collect | |
and persist structured metadata on those people""" | |
# define the path to the json with wikipedia ids and names | |
wiki_id_json_pages = glob.glob(out_dir + sub_dirs[0] + "/*.json") | |
# iterate over all pages that detail individual people | |
for page_index, page in enumerate(wiki_id_json_pages): | |
# only process the pages currently in scope | |
page_number = int(page.split("_")[-1].split(".")[0]) | |
if page_number < start_page: | |
continue | |
if page_number > end_page: | |
continue | |
page_ids_to_names = parse_ids_from_json_page(page) | |
# iterate over each person on this page | |
for id_index, wikipedia_id in enumerate(page_ids_to_names.iterkeys()): | |
print("fetching:", page_number, id_index, wikipedia_id, "\n") | |
try: | |
wikipedia_name = page_ids_to_names[wikipedia_id] | |
collect_metadata(wikipedia_id, wikipedia_name) | |
except Exception as exc: | |
print(exc) | |
with open("could_not_parse.log", "a") as err_out: | |
err_out.write(wikipedia_id + "\n") | |
if __name__ == "__main__": | |
# specify the output directories | |
out_dir = "collected_data/" | |
sub_dirs = ["people_pages", "wikipedia_text", "corenlp_json", "wiki_image_and_text", "page_views", "dbpedia", "images"] | |
card_json_dir = "../../card_json" | |
make_dirs(out_dir, sub_dirs) | |
# identify required text processing resources | |
sentence_tokenizer = nltk.data.load('tokenizers/punkt/english.pickle') | |
stop_words = set(nltk.corpus.stopwords.words('english')) | |
punctuation = set(string.punctuation) | |
dims = 300 | |
vsm = create_vsm("utils/glove.840B.300d.txt") | |
# get json on all people in dbpedia using sge submission script array_job.sh | |
# https://gist.github.com/duhaime/fd11900e763e8ceb1a13798ec54c9c3e | |
start_page = int(sys.argv[1])-1 if len(sys.argv) > 1 else 0 | |
end_page = start_page+1000 | |
results_per_page = 100 | |
get_json_on_all_wiki_people(start_page=start_page, end_page=end_page) | |
with open('arg_log.txt', 'a') as out: | |
out.write(str(start_page) + ' ' + str(end_page) + '\n') | |
# get structured metadata on all people in dbpedia | |
get_metadata_on_all_wiki_people() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment