Skip to content

Instantly share code, notes, and snippets.

@duhaime
Created August 27, 2021 16:43
Show Gist options
  • Save duhaime/aa132323aca8aa50587331267def162d to your computer and use it in GitHub Desktop.
Save duhaime/aa132323aca8aa50587331267def162d to your computer and use it in GitHub Desktop.
Download WikiPedia People Data
from SPARQLWrapper import SPARQLWrapper, JSON
from collections import defaultdict
from bs4 import BeautifulSoup
import numpy as np
import json, os, glob, subprocess, string
import nltk, codecs, re, shutil, sys
##################################################
# Wrapper to find nested location of stored data #
##################################################
def get_nested_path(filename):
"""Read in a filename (e.g. 0428271.json) and return the
series of 3 subdirectories in which that file is stored
plus the filename itself (e.g. 0/4/2/0428271.json)"""
nested_dirs = "/".join(filename[i] for i in [0, 1, 2])
return "/" + nested_dirs + "/" + filename
#################################################
# Collect Wikipedia name and ids for all people #
#################################################
def send_sparql_query(page_number, results_per_page):
"""Return dbpedia json on all people in dbpedia. Note:
dbpedia returns a maximum of 10000 responses per query"""
print("requesting page", page_number)
# specify the name of the file where the data will be stored
outfile_name = 'all_people_page_' + str(page_number) + '.json'
# return the data from disk if possible
try:
sparql_json_file = out_dir + sub_dirs[0] + "/" + outfile_name
with open(sparql_json_file) as sparql_json_in:
sparql_json = json.load(sparql_json_in)
return sparql_json
except Exception as exc:
print("local copies of", sparql_json_file, "not found, so requesting data")
max_retries = 10
retries = 0
while retries < max_retries:
try:
query = """select distinct ?person ?wikipediaPageid {
?person a dbo:Person ;
dbo:wikiPageID ?wikipediaPageid
} LIMIT """ + str(results_per_page) + \
" OFFSET " + str(page_number * results_per_page)
# submit the query and return a json response
sparql = SPARQLWrapper("http://dbpedia.org/sparql")
sparql.setReturnFormat(JSON)
sparql.setQuery(query)
# conver the response to json
sparql_json = sparql.query().convert()
# write the response to disk
write_json(out_dir + sub_dirs[0] + "/" + outfile_name, sparql_json)
return sparql_json
except Exception as exc:
print('could not retrieve sparql json due to error:', exc)
time.sleep(5)
retries += 1
# worst case scenaria
return None
def make_dirs(out_dir, sub_dirs):
"""Read in an out directory and a list of subdirectories,
and write them all to disk"""
if not os.path.exists(out_dir):
os.makedirs(out_dir)
for sub_dir in sub_dirs:
dir_path = out_dir + sub_dir
for i in range(10):
for j in range(10):
for k in range(10):
outpath = dir_path + '/' + '/'.join([str(l) for l in [i,j,k]])
if not os.path.exists(outpath):
# avoid race condition across workers
try:
os.makedirs(outpath)
except:
pass
def write_json(file_name, json_object):
"""Read in a filename and a json packet, and write the latter
to the former"""
with open(file_name, "w") as json_out:
json.dump(json_object, json_out)
def get_page(page_number, results_per_page):
"""Function to send sparql query and write results to disk"""
sparql_json = send_sparql_query(page_number, results_per_page)
if sparql_json and len(sparql_json["results"]["bindings"]) == results_per_page:
return page_number + 1
else:
return 0
def get_json_on_all_wiki_people(start_page=0, end_page=False):
"""Request json on all people in dbpedia and
write json on each to disk. If the user uses start and
or end pages, start and end on those pages"""
# get the first page of results, if there are more
# to get thereafter, carry on until finished
next_page = get_page(0 + start_page, results_per_page)
while next_page > 0:
if next_page > end_page:
next_page = 0
break
next_page = get_page(next_page, results_per_page)
###############################################
# Collect DBPedia metadata on all wiki people #
###############################################
def parse_ids_from_json_page(page):
"""Read in a page of json responses with wiki ids
and names, and return a clean dictionary mapping of
id to name"""
d = {}
with open(page) as f:
j = json.load(f)
for person in j["results"]["bindings"]:
wikipedia_id = person["wikipediaPageid"]["value"]
wikipedia_name = person["person"]["value"]
d[wikipedia_id] = wikipedia_name.replace("http://dbpedia.org/resource/","")
return d
def get_dbpedia_metadata(page_name, wikipedia_page_id):
"""Read in the case-sensitive name of a wikipedia page and
return structured metadata on that page from dbpedia"""
dbpedia_path = out_dir + sub_dirs[5] + "/" + get_nested_path(wikipedia_page_id) + "_dbpedia.json"
try:
with open(dbpedia_path) as dbpedia_in:
response = json.load(dbpedia_in)
return response
except:
pass
query = """
SELECT ?property ?hasValue ?isValueOf
WHERE {
{ <http://dbpedia.org/resource/""" + page_name + """> ?property ?hasValue }
UNION
{ ?isValueOf ?property <http://dbpedia.org/resource/""" + page_name + """> }
}"""
# submit the query and return a json response
sparql = SPARQLWrapper("http://dbpedia.org/sparql")
sparql.setReturnFormat(JSON)
sparql.setQuery(query)
# parse the response
response = sparql.query().convert()
parsed_response = parse_dbpedia_metadata(response)
# write the data to disk
if parsed_response:
write_json(dbpedia_path, parsed_response)
return parsed_response
def parse_dbpedia_metadata(sparql_response):
"""Read in structured json from dbpedia, and return json
that details fields of interest within the metadata"""
parsed_metadata = {}
# map sparql fields of interest to a name for the field
fields_of_interest = {
"http://dbpedia.org/ontology/thumbnail": "thumbnail",
"http://dbpedia.org/ontology/birthDate": "birth_date",
"http://dbpedia.org/ontology/deathDate": "death_data",
"http://xmlns.com/foaf/0.1/name": "name",
"http://dbpedia.org/ontology/abstract": "abstract"
}
for field in sparql_response["results"]["bindings"]:
for field_of_interest in fields_of_interest.iterkeys():
if field["property"]["value"] == field_of_interest:
label = fields_of_interest[field_of_interest]
# only retain english abstracts
if label == "abstract":
if field["hasValue"]["xml:lang"] != "en":
continue
parsed_metadata[label] = field["hasValue"]["value"]
return parsed_metadata
###########################
# Text processing helpers #
###########################
def get_sentences(raw_html):
"""Read in some html and return plaintext sentences"""
paragraphs = []
soup = BeautifulSoup(raw_html, "html5lib")
for node in soup.findAll('p'):
paragraphs.append( "".join(node.findAll(text=True)) )
text = " ".join(paragraphs)
return sentence_tokenizer.tokenize(text)
def write_plaintext(wikipedia_id, sentence_array, plaintext_path):
"""Read in a wikipedia id and a list of sentences,
and write the latter to disk"""
with codecs.open(plaintext_path, "w", "utf-8") as out:
out.write(" ".join(sentence_array))
def clean_text(s):
"""Read in a string and return the string in a format
suitable for display in app"""
# remove content between round braces, then square braces
s = re.sub(r'\([^)]*\)', '', s)
s = re.sub(r'\[(.+?)\]', '', s)
# ad-hoc required cleanups
s = s.replace(",,", ",")
return s
def create_vsm(path_to_glove_file, sep=" "):
"""Read in a GloVe file, split it on the sep character
into dims units, and return a mapping from string to
the dims dimensional vector position of the given word
in a vector space model"""
return None
vsm = defaultdict(list)
with codecs.open(path_to_glove_file, "r", "utf-8") as f:
f = f.read().lower().split("\n")
for r in f:
try:
sr = r.split(sep)
word = sr[0]
word_vector = [float(i) for i in sr[1:]]
if len(word_vector) != dims:
continue
vsm[word] = word_vector
except ValueError:
print(r.split(sep)[0])
return vsm
def clean_word(s):
"""Read in a word and return that word in clean form"""
return ''.join(c for c in s if c not in punctuation)
def get_doc_vector(s):
"""Read in a string and return a doc vector for that string"""
return None
word_vector_list = []
words = [clean_word(w) for w in s.lower().split() if w not in stop_words]
word_count = 0
for w in words:
try:
word_vector = vsm[w]
if len(word_vector) == dims:
word_vector_list.append(word_vector)
word_count += 1
except KeyError:
continue
# take the sum for each column
column_sums = np.sum(word_vector_list, axis=0)
# generate a length normalized vector with limited float precision
normalized = ["{0:.4f}".format(c/word_count) for c in column_sums]
return normalized
##############################
# Metadata parsing utilities #
##############################
def parse_wiki_metadata(wikipedia_json):
"""Read in a json response from wikipedia and return
plaintext that contains the text content in that json"""
# parse out clean plaintext
punct = [".", ",", ";"]
extract_text = ''
try:
extract = wikipedia_json["extract"]
extract_words = clean_text(extract).split("=")[0].split()[:120]
except KeyError:
return 0
for c, w in enumerate(extract_words):
if c == 0:
extract_text += w
elif w in punct:
extract_text += w
else:
extract_text += " " + w
# try to grab the image url
try:
thumbnail_url = wikipedia_json["thumbnail"]["source"]
except KeyError:
thumbnail_url = ""
response = {
"bio": extract_text,
"thumbnail": thumbnail_url,
"doc_vector": get_doc_vector(extract)
}
return response
def get_image_and_text(wikipedia_id):
"""Read in a wikipedia id and return plaintext content
suitable for displaying in card json"""
# if the data already exists on disk, return it
image_and_text_path = out_dir + sub_dirs[3] + "/" + get_nested_path(wikipedia_id) + "_image_and_text.json"
try:
with open(image_and_text_path) as f:
return json.load(f)
except:
pass
# else query for the data and write it to disk
query = 'https://en.wikipedia.org/w/api.php?action=query'
query += '&redirects=1&prop=pageimages|info|extracts'
query += '&inprop=url&format=json&pithumbsize=300&explaintext'
query += '&pageids=' + wikipedia_id
request = "curl '" + query + "'"
response = subprocess.check_output([request], shell=True)
response_json = json.loads(response)
try:
card_json = response_json["query"]["pages"][str(wikipedia_id)]
curated_metadata = parse_wiki_metadata(card_json)
# write the image and text to disk
if curated_metadata:
write_json(image_and_text_path, curated_metadata)
return curated_metadata
except KeyError:
return 0
def get_wikipedia_metadata(wikipedia_name, wikipedia_id, max_sentences=20):
"""Read in a wikipedia page name and fetch text from
that page via a wget request"""
# if the plaintext already exists on disk, use it, else fetch it
plaintext_path = out_dir + sub_dirs[1] + "/" + get_nested_path(wikipedia_id) + "_plaintext.txt"
if not os.path.isfile(plaintext_path):
try:
request = "curl 'https://en.wikipedia.org/wiki/" + wikipedia_name + "'"
raw_html = subprocess.check_output([request], shell=True)
sentences = get_sentences(raw_html)
write_plaintext(wikipedia_id, sentences[:max_sentences], plaintext_path)
except subprocess.CalledProcessError:
return 0
# fetch json parsed by wiki that's fit for displaying on the client
curated_metadata = get_image_and_text(wikipedia_id)
return curated_metadata
def get_page_view_stats(wikipedia_page_name, wikipedia_page_id):
"""Read in a wikipedia page name and return the aggregate
number of times that page was accessed in October of 2015"""
# if the pageview json exists on disk, use it
page_views_path = out_dir + sub_dirs[4] + "/" + get_nested_path(wikipedia_page_id) + "_page_views.json"
try:
with open(page_views_path) as page_views_in:
return json.load(page_views_in)
except:
pass
query = "https://wikimedia.org/api/rest_v1/metrics/pageviews/"
query += "per-article/en.wikipedia/all-access/all-agents/"
query += wikipedia_page_name + "/daily/2015100100/2015103100"
try:
request = 'curl "' + query + '"'
response = subprocess.check_output([request], shell=True)
json_response = json.loads(response)
except subprocess.CalledProcessError:
return 0
try:
page_views = sum([item["views"] for item in json_response["items"]])
# write the page views data to disk
if page_views:
write_json(page_views_path, page_views)
return page_views
except KeyError:
return 0
#############################
# Generate Card json output #
#############################
def get_thumbnail_image(wikipedia_page_id, thumbnail_url):
"""Read in a wikipedia page id and a url to that page's
thumbnail, and fetch the thumbnail"""
# copy the card thumbnail to the images directory if it's not there
image_out_path = out_dir + sub_dirs[6]
image_out_path += get_nested_path(wikipedia_page_id) + ".jpg"
if not os.path.isfile(image_out_path):
try:
subprocess.check_output(["wget '" + thumbnail_url + "' -O " + image_out_path], shell=True)
except subprocess.CalledProcessError:
raise Exception('thumbnails are required')
# validate the image file contains content
if os.path.getsize(image_out_path) < 100:
raise Exception('thumbnails are required')
else:
return 0
def get_thumbnail(wiki_parsed, dbpedia_parsed):
"""Read in parsed wiki and dbpedia json, and return the thumbnail
for the current page"""
if wiki_parsed["thumbnail"]:
return wiki_parsed["thumbnail"]
elif "thumbnail" in dbpedia_parsed.iterkeys():
return dbpedia_parsed["thumbnail"]
else:
raise Exception('thumbnails are required')
def get_plaintext(wikipedia_page_id):
"""Read in a wikipedia page id and return that page's plaintext"""
plaintext_path = out_dir + sub_dirs[1] + "/" + get_nested_path(wikipedia_page_id) + "_plaintext.txt"
with codecs.open(plaintext_path, "r", "utf-8") as plaintext_in:
return plaintext_in.read()
def get_bio(wiki_parsed, dbpedia_parsed):
"""Read in parsed wiki and dbpedia metadata and return the
bio for the current card"""
if wiki_parsed["bio"]:
return wiki_parsed["bio"]
elif "abstract" in dbpedia_parsed.iterkeys():
return dbpedia_parsed["abstract"]
else:
raise Exception('bios are required')
def write_card_json(dbpedia_parsed, wiki_parsed):
"""Read in structured dbpedia metadata and a plaintext abstract
for the current record, and write the current card json to disk"""
wikipedia_page_id = wiki_parsed["wikipedia_page_id"]
wikipedia_page_name = wiki_parsed["wikipedia_page_name"]
# retrieve a thumbnail and bio or don't write the card
thumbnail_url = get_thumbnail(wiki_parsed, dbpedia_parsed)
thumbnail_image = get_thumbnail_image(wikipedia_page_id, thumbnail_url)
bio = get_bio(wiki_parsed, dbpedia_parsed)
# clean the agent's name
name = clean_text( " ".join(wikipedia_page_name.split("_") ) )
# try to retrieve the individuals dates
birth_date = dbpedia_parsed["birth_date"] if "birth_date" in dbpedia_parsed.iterkeys() else ''
death_date = dbpedia_parsed["death_date"] if "death_date" in dbpedia_parsed.iterkeys() else ''
card_json = {
"bio": wiki_parsed["bio"],
"birth_date": birth_date,
"death_date": death_date,
"name": name,
"thumbnail": "./assets/images/" + wikipedia_page_id + ".jpg",
"doc_vector": [float(v) for v in wiki_parsed["doc_vector"]],
"page_views": wiki_parsed["page_views"],
"plaintext": get_plaintext(wikipedia_page_id),
"wikipedia_page_id": wikipedia_page_id,
"wikipedia_page_name": wikipedia_page_name
}
out_file = wiki_parsed["wikipedia_page_id"] + "_card.json"
sub_dir_0 = wiki_parsed["wikipedia_page_id"][0]
sub_dir_1 = wiki_parsed["wikipedia_page_id"][1]
sub_dir_2 = wiki_parsed["wikipedia_page_id"][2]
out_dir_path = card_json_dir + "/" + "/".join([sub_dir_0, sub_dir_1, sub_dir_2])
if not os.path.exists(out_dir_path):
os.makedirs(out_dir_path)
write_json(out_dir_path + "/" + out_file, card_json)
def collect_metadata(wikipedia_id, wikipedia_name):
"""Read in the id and name of a person in wikipedia, acquire
metadata on that individual, and write it to disk"""
# request all data if it doesn't exist on disk
dbpedia_parsed = get_dbpedia_metadata(wikipedia_name, wikipedia_id)
wiki_parsed = get_wikipedia_metadata(wikipedia_name, wikipedia_id)
page_views = get_page_view_stats(wikipedia_name, wikipedia_id)
if not wiki_parsed:
raise Exception('bios are required')
if not page_views:
raise Exception('page views are required')
# compile and write the metadata to disk
wiki_parsed["wikipedia_page_name"] = wikipedia_name
wiki_parsed["wikipedia_page_id"] = wikipedia_id
wiki_parsed["page_views"] = page_views
write_card_json(dbpedia_parsed, wiki_parsed)
def get_metadata_on_all_wiki_people():
"""Iterate over the pages of people json, collect
and persist structured metadata on those people"""
# define the path to the json with wikipedia ids and names
wiki_id_json_pages = glob.glob(out_dir + sub_dirs[0] + "/*.json")
# iterate over all pages that detail individual people
for page_index, page in enumerate(wiki_id_json_pages):
# only process the pages currently in scope
page_number = int(page.split("_")[-1].split(".")[0])
if page_number < start_page:
continue
if page_number > end_page:
continue
page_ids_to_names = parse_ids_from_json_page(page)
# iterate over each person on this page
for id_index, wikipedia_id in enumerate(page_ids_to_names.iterkeys()):
print("fetching:", page_number, id_index, wikipedia_id, "\n")
try:
wikipedia_name = page_ids_to_names[wikipedia_id]
collect_metadata(wikipedia_id, wikipedia_name)
except Exception as exc:
print(exc)
with open("could_not_parse.log", "a") as err_out:
err_out.write(wikipedia_id + "\n")
if __name__ == "__main__":
# specify the output directories
out_dir = "collected_data/"
sub_dirs = ["people_pages", "wikipedia_text", "corenlp_json", "wiki_image_and_text", "page_views", "dbpedia", "images"]
card_json_dir = "../../card_json"
make_dirs(out_dir, sub_dirs)
# identify required text processing resources
sentence_tokenizer = nltk.data.load('tokenizers/punkt/english.pickle')
stop_words = set(nltk.corpus.stopwords.words('english'))
punctuation = set(string.punctuation)
dims = 300
vsm = create_vsm("utils/glove.840B.300d.txt")
# get json on all people in dbpedia using sge submission script array_job.sh
# https://gist.github.com/duhaime/fd11900e763e8ceb1a13798ec54c9c3e
start_page = int(sys.argv[1])-1 if len(sys.argv) > 1 else 0
end_page = start_page+1000
results_per_page = 100
get_json_on_all_wiki_people(start_page=start_page, end_page=end_page)
with open('arg_log.txt', 'a') as out:
out.write(str(start_page) + ' ' + str(end_page) + '\n')
# get structured metadata on all people in dbpedia
get_metadata_on_all_wiki_people()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment