Skip to content

Instantly share code, notes, and snippets.

@bpgergo
Created October 4, 2012 10:20
Show Gist options
  • Save bpgergo/3832744 to your computer and use it in GitHub Desktop.
Save bpgergo/3832744 to your computer and use it in GitHub Desktop.
Download people info from wikipedia, group people by zodiac
# coding= utf-8
import sys
import requests
from lxml import etree
import re
import pickle
from datetime import datetime
'''
feeble but working
http://stackoverflow.com/questions/3274597/how-would-i-determine-zodiac-astrological-star-sign-from-a-birthday-in-python/3274654#3274654
'''
zodiacs = [(120, 'Sag'), (218, 'Cap'), (320, 'Aqu'), (420, 'Pis'), (521, 'Ari'),
(621, 'Tau'), (722, 'Gem'), (823, 'Can'), (923, 'Leo'), (1023, 'Vir'),
(1122, 'Lib'), (1222, 'Scorp'), (1231, 'Sag')]
def get_zodiac_of_date(date):
date_number = int("".join((str(date.date().month), '%02d' % date.date().day)))
for z in zodiacs:
if date_number < z[0]:
return z[1]
return None
url_cache_file_name='url_cache.p'
name_list_file_name = 'names.p'
name_db_file_name = 'name_db.p'
zodiac_db_file_name = 'zodiac_db.p'
'''
load object from file
if not found, return empty dictionary
'''
def get_cache(file_name):
try:
return pickle.load(open(file_name, "rb" ))
except Exception, e:
sys.stderr.write( "ERROR WHILE LOADING OBJECT FROM FILE:%s; EXCEPTION:%s" % (file_name, e) )
'''
save object to file
'''
def save_cache(cache, file_name):
pickle.dump(cache, open(file_name, "wb" ))
'''
return the content of a URL
look up URL in a cache and skip the actual downloading if the URL is found in cache
TODO if the URL is redirected, then return new URL
'''
def download_url(url, cache=None):
if not url:
return None, None
if cache is not None and url in cache:
return url, cache[url]
else:
r = requests.get(url)
if r.status_code == 200:
if cache is not None:
cache[url] = r.text
save_cache(cache) #save cache immediately (maybe too eager?)
return url, r.text
return None, None
'''
this function is specific to the Wikipedia's special export format
see http://en.wikipedia.org/wiki/Special:Export
'''
def extract_page_text(text):
if not text:
return None
if text:
root = etree.XML(text)
try:
return list(root.iter(tag='{%s}text' % root.nsmap[None]))[0].text
except Exception: #, e:
pass #sys.stderr.write( "ERROR WHILE EXTRACTING TEXT FROM PAGE", e )
return None
def extract_pattern_from_line(line, pattern):
if not line:
return None
search = pattern.search(line)
if search:
return search.groups()[0]
return None
'''
return names from a listing
list item looks like this:
*[[Aristotle]] — Athens, Greece (384–322&nbsp;BC)
'''
name_pattern = re.compile('^\*\[\[(.*)\]\]')
def extract_names(text):
if not text:
return None
result = []
for line in text.split('\n'):
name = extract_pattern_from_line(line, name_pattern)
if name:
result.append(name)
if "== See also ==" in line:
break
return result
'''
return date object from a string like this:
DATE OF BIRTH=February 28, 1948
'''
birth_pattern = re.compile('DATE OF BIRTH\s*=\s*([^\s]*.*$)')
delete_chars = re.compile('[,_]*')
delete_comment = re.compile('<!--.*-->')
DATE_FORMATS = ['%B %d %Y', '%d %B %Y', '%Y-%m-%d']
def extract_date_of_birth(line):
def strip(date_str, date_format):
try:
return datetime.strptime(date_str, date_format)
except Exception, e:
sys.stderr.write( "ERROR WHILE CONVERTING date:%s; format%s; error:%s" % (date_str, date_format, e) )
return None
date_of_birth = None
date_str = extract_pattern_from_line(line, birth_pattern)
if date_str:
date_str = delete_chars.sub('', date_str).strip()
date_str = delete_comment.sub('', date_str).strip()
for date_format in DATE_FORMATS:
date_of_birth = strip(date_str, date_format)
if date_of_birth:
break
return date_of_birth
space = re.compile(' ')
BASE_URL = 'http://en.wikipedia.org/wiki/'
CRAWL_POSTFIX = 'Special:Export/'
'''
get Special export url of a name
replace spaces with underscore in the name
'''
def get_url_from_name(name):
#url = 'http://en.wikipedia.org/wiki/Special:Export/%s' % space.sub('_', name)
url = BASE_URL + CRAWL_POSTFIX + space.sub('_', name)
return url.split('|')[0]
'''
downlad the page for a name
extract interesting data from the page
return data in a dictionary
'''
def get_name_data(name, url_cache):
if not name:
return None
result = dict()
url = get_url_from_name(name)
url, page = download_url(url, url_cache)
result['URL'] = url
text = extract_page_text(page)
if text:
for line in text.split('\n'):
date_of_birth = extract_date_of_birth(line)
if date_of_birth is not None:
result['DATE_OF_BIRTH'] = date_of_birth
result['ZODIAC'] = get_zodiac_of_date(result['DATE_OF_BIRTH'])
break
return result
'''
update the set of names from a list url
'''
def update_names(names, list_url, url_cache):
page = download_url(list_url, url_cache)[1]
text = extract_page_text(page)
names.update(extract_names(text))
save_cache(names, name_list_file_name)
'''
process a collection of names
for each name, get data and save it
'''
def build_name_db(names, url_cache, name_db):
for name in names:
name_db[name] = get_name_data(name, url_cache)
save_cache(name_db, name_db_file_name) #save cache immediately (maybe too eager?)
'''
get list url from command line argument
return default List_of_physicists url if not command line argument not present
'''
def get_listurl():
url = 'http://en.wikipedia.org/wiki/Special:Export/List_of_physicists'
if len(sys.argv) > 1:
url = sys.argv[1]
return url
'''
for each different zodiac, assign a list of names
return a dictionary of zodiacs
'''
def build_zodiacs_db(name_db):
zodiacs = dict()
for name, db in name_db.items():
if 'ZODIAC' in db:
zodiac = db['ZODIAC']
if zodiac not in zodiacs:
zodiacs[zodiac] = []
zodiacs[zodiac].append({'NAME':name, 'URL':db['URL']})
save_cache(zodiacs, zodiac_db_file_name)
return zodiacs
header = '''<!DOCTYPE html>
<html>
<head><title>%s grouped by %s</title></head>
<body>
'''
footer = '''
</body></html>'''
remove_crawl_url = re.compile(CRAWL_POSTFIX)
title_template = '<span style="font-size: x-large;"><a href="%s">%s</a> grouped by %s</span><br />'
category_template = '<br /><span style="font-size: large;">%s</span><br />'
link_template = '<a href="%s">%s</a><br />'
def write_out_zodiacs(zodiac_db, out):
out.write( header % ('Physicists', 'zodiac') )
out.write( title_template % (remove_crawl_url.sub('', get_listurl()), 'Physicists', 'zodiac') )
for zodiac, li in zodiac_db.items():
out.write( category_template % zodiac )
for item in li:
out.write( link_template % (remove_crawl_url.sub('', item['URL']), item['NAME']) )
out.write( footer )
def do_whole_process():
list_url = get_listurl()
url_cache = get_cache(url_cache_file_name)
if not url_cache: url_cache = dict()
names = get_cache(name_list_file_name)
if not names: names = set()
update_names(names, list_url, url_cache)
name_db = get_cache(name_db_file_name)
if not name_db: name_db = dict()
build_name_db(names, url_cache, name_db)
zdb = build_zodiacs_db(name_db)
write_out_zodiacs(zdb, sys.stdout)
def convert_name_db_to_zodiacs_db():
name_db = get_cache(name_db_file_name)
build_zodiacs_db(name_db)
def write_out_zodiacs_db():
zodiac_db = get_cache(zodiac_db_file_name)
write_out_zodiacs(zodiac_db, sys.stdout)
do_whole_process()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment