Skip to content

Instantly share code, notes, and snippets.

Created April 11, 2011 01:24
Show Gist options
  • Save anonymous/912934 to your computer and use it in GitHub Desktop.
Save anonymous/912934 to your computer and use it in GitHub Desktop.
Extract data from lolbase.net into a sqlite file.
#! /usr/bin/env python2.6
#
# Copyright 2011 James Aguilar
# Released under the new BSD license.
#
# Dependencies:
# - gflags
# - Beautiful Soup
from BeautifulSoup import BeautifulSoup
import gflags
import re
import sqlite3
import sys
import urllib2
FLAGS = gflags.FLAGS
gflags.DEFINE_string('player_name', 'us/SundownAbsolute',
'The player name to fetch, including the domain. '
'The domain and name are separated by a slash and are '
'not case-sensitive. Example: "us/SundownAbsolute"')
gflags.DEFINE_string('db_path',
'',
'Path to a database file. A new file will be created '
'if the specified file name does not exist.')
gflags.DEFINE_bool('reset', False, 'Reset all tables?')
gflags.DEFINE_integer('max_players',
1,
'The maximum number of players to fetch.')
def make_db():
db = sqlite3.connect(FLAGS.db_path)
db.row_factory = sqlite3.Row
create_tables(db)
if len(FLAGS.player_name) > 0:
db.execute('INSERT OR IGNORE INTO player '
'(player_name, player_id, last_fetch_time) VALUES '
'(?, ?, NULL)',
(FLAGS.player_name,
get_player_id(FLAGS.player_name)))
db.commit()
num_players = 0
while num_players < FLAGS.max_players:
# Get some player whose matches have not been fetched yet.
player_id = None
player_name = None
for row in db.execute('SELECT player_id, player_name FROM player '
'WHERE last_fetch_time IS NULL LIMIT 1'):
player_name = row['player_name']
if player_id is None:
player_id = get_player_id(player_name)
db.execute('UPDATE player SET player_id = ? WHERE '
'player_name = ?', (player_id, player_name))
if player_id is not None: # No unfetched player ids.
# Scrape the player's match list into the match table.
print('loading matches from %s' % player_name)
c = db.executemany('INSERT OR IGNORE INTO match '
'(match_id, match_fetched) '
'VALUES (?, 0)',
[(v,) for v in get_matches(player_id)])
print('loaded %d matches from player %s' %
(c.rowcount, player_name))
db.execute('UPDATE player SET last_fetch_time = datetime(\'now\') '
'WHERE player_id = ?', (player_id,))
db.commit()
num_players += 1
matches = [row['match_id'] for row in db.execute(
'SELECT match_id FROM match WHERE match_fetched = 0')]
print('from %d unloaded matches' % (len(matches)))
# Scrape each match this player was involved in that we have
# not already scraped.
count = 0
for match in matches:
count += 1
if (count % 50) == 0:
print('loading match %d' % count)
load_match(match, db)
db.commit()
db.close()
def create_tables(db):
drop_and_create('match',
'id INTEGER PRIMARY KEY, match_id TEXT, '
'length_minutes INTEGER, timestamp DATETIME, '
'match_fetched BOOLEAN, UNIQUE(match_id)',
db)
drop_and_create('player',
'id INTEGER PRIMARY KEY, player_id TEXT, '
'player_name TEXT, last_fetch_time DATETIME, '
'UNIQUE(player_id), UNIQUE(player_name)',
db)
drop_and_create('champion',
'id INTEGER PRIMARY KEY, name TEXT, UNIQUE(name)',
db)
drop_and_create(
'player_match', """
id INTEGER PRIMARY KEY,
player_fk INTEGER, match_fk INTEGER, champion_fk INTEGER,
team_fk INTEGER,
level INTEGER, kills INTEGER, deaths INTEGER,
assists INTEGER, cs INTEGER, barracks INTEGER, turrets INTEGER,
neutral_cs INTEGER, damage INTEGER, damage_physical INTEGER,
damage_magical INTEGER, crit INTEGER, spree INTEGER,
multi INTEGER, damage_taken INTEGER, damage_taken_physical INTEGER,
damage_taken_magical INTEGER, healing INTEGER, gold INTEGER,
time_dead REAL, UNIQUE(player_fk, match_fk)""",
db)
drop_and_create('team_match',
'id INTEGER PRIMARY KEY, match_fk INTEGER, '
'winner BOOLEAN, kills INTEGER, deaths INTEGER, '
'assists INTEGER, cs INTEGER, turrets INTEGER, '
'UNIQUE(match_fk, winner)',
db)
def drop_and_create(name, colspec, db):
if FLAGS.reset:
db.execute('DROP TABLE IF EXISTS %s' % name)
db.execute('CREATE TABLE IF NOT EXISTS %s (%s)' % (name, colspec))
def get_player_id(player_name):
page = urllib2.urlopen('http://www.lolbase.net/%s' % player_name).read()
match = re.search('"http://www.lolbase.net/matches/player/(.*)"', page)
if match is None:
raise Exception('no player id found on page: \n' + page)
return match.group(1)
def get_matches(player_id):
page_num = 1
matches = []
while True:
try:
page_matches = parse_matches(fetch_page(player_id, page_num))
matches.extend(page_matches)
if len(page_matches) == 0:
break
except Exception, e:
print(str(e))
break
page_num += 1
return matches
def fetch_page(player_id, page_num):
url = None
if page_num == 1:
url = 'http://www.lolbase.net/matches/player/%s' % player_id
else:
url = ('http://www.lolbase.net/matches/player/%s/%d' %
(player_id, page_num))
return urllib2.urlopen(url).read()
def parse_matches(page):
matches = [match.group(1) for match in
re.finditer('"http://www.lolbase.net/matches/view/(.*)"',
page)]
return matches
def load_match(match_id, db):
page = fetch_match(match_id)
soup = BeautifulSoup(page)
player_dicts = load_player_matches(soup, match_id, db)
match_data = parse_match_header(soup)
match_data['match_id'] = match_id
db.execute('UPDATE match SET match_fetched = 1, '
' length_minutes = :duration, '
' timestamp = :timestamp '
'WHERE match_id = :match_id',
match_data)
def parse_match_header(soup):
match_header = soup.find('div', {'class': 'match-header'})
data = {}
# location_timestamp_text formatted like "MAP NAME - TIMESTAMP"
location_timestamp_text = match_header.find('span',
{'style': 'margin-left: 16px;',
'class': None}).text
data['timestamp'] = re.match('.* - (.*)', location_timestamp_text).group(1)
duration_tag = match_header.find('span', {'class': 'yellow'})
data['duration'] = int(re.match('Duration: (\d*) minutes',
duration_tag.text).group(1))
return data
def fetch_match(match_id):
url = 'http://www.lolbase.net/matches/view/%s' % match_id
return urllib2.urlopen(url).read()
def load_player_matches(soup, match_id, db):
overview_div = soup.find('div', { 'class': 'match-overview' })
team_keys = load_team_match(overview_div, match_id, db)
player_tags = overview_div.table.findAll('tr',
{'class': re.compile(r'row\d')})
for i in range(len(player_tags)):
tag = player_tags[i]
match_data = parse_player_game(tag, tag.findNextSibling('tr'))
# Get the player foreign key.
player = re.match('http://www.lolbase.net/(.*)',
tag.a['href']).group(1)
match_data['player_fk'] = player_key_from_name(player, db)
# Get the champion foreign key.
champion = tag.img['alt']
if len(champion) == 0:
champion = resolve_broken_champion(tag.img)
match_data['champion_fk'] = champion_id(champion, db)
match_data['team_fk'] = (team_keys[0] if i < len(player_tags) / 2
else team_keys[1])
match_data['match_fk'] = match_key_from_name(match_id, db)
insert_map('player_match', match_data, db)
def load_team_match(overview_div, match_id, db):
team_keys = []
for tag in overview_div.findAll('tr', style='padding-bottom: 16px;'):
team_data = dict(
kills = int(tag.find('span', title='Total kills').text),
deaths = int(tag.find('span', title='Total deaths').text),
assists = int(tag.find('span', title='Total assists').text),
cs = int(tag.find('em', title=re.compile('.*minions.*')).text),
turrets = int(tag.find('em', title=re.compile('.*turrets.*')).text),
winner = (tag.find('span',
{'class': re.compile('yellow|red')}).text ==
'Winning Team'),
match_fk = match_key_from_name(match_id, db))
try:
team_keys.append(insert_map('team_match', team_data, db))
except Exception, e:
print('error trying to insert %s into team_match' % str(team_data))
raise e
return team_keys
def insert_map(table, map, db):
columns = ', '.join([key for key in map.iterkeys()])
binds = ', '.join([':' + key for key in map.iterkeys()])
stmt = 'INSERT INTO %s (%s) VALUES (%s)' % (table, columns, binds)
return db.execute(stmt, map).lastrowid
def parse_player_game(overview_tab, detail_tab):
detail = lambda *a, **kwa: parse_detail_entry(detail_tab, *a, **kwa)
match_data = {}
match_data['level'] = int(overview_tab.find('td', {'class': 'green'}).text)
match_data['kills'] = int(detail('Kills:'))
match_data['deaths'] = int(detail('Deaths:'))
match_data['assists'] = int(detail('Assists:'))
match_data['cs'] = int(detail('Minions slain:'))
match_data['barracks'] = int(detail('Barracks destroyed:'))
match_data['turrets'] = int(detail('Turrets destroyed:'))
match_data['neutral_cs'] = int(detail('Neutral monsters slain:'))
match_data['damage'] = int(detail('Damage dealt:'))
match_data['damage_physical'] = int(detail('Physical damage dealt:'))
match_data['damage_magical'] = int(detail('Magic damage dealt:'))
match_data['crit'] = int(detail('Largest critical strike:'))
match_data['spree'] = int(detail('Largest killing spree:'))
match_data['multi'] = int(detail('Largest multi kill:'))
match_data['damage_taken'] = int(detail('Damage taken:'))
match_data['damage_taken_physical'] = int(detail('Physical damage taken:'))
match_data['damage_taken_magical'] = int(detail('Magic damage taken:'))
match_data['healing'] = int(detail('Health restored:'))
match_data['gold'] = int(detail('Gold:'))
match_data['time_dead'] = float(detail('Time spent dead:',
filter='(.*) min'))
return match_data
def parse_detail_entry(tag, entry_text, filter=None):
data_tag = tag.find('td', text=entry_text).parent.findNextSibling('th')
if filter is None:
return data_tag.text
else:
return re.match(filter, data_tag.text).group(1)
def resolve_broken_champion(tag):
""" lolbase.net is broken for some champions. Some of them are new, but
inexplicably, some of them are old. When the alt text does not include
the champion name, though, the champ is usually recognizable through the
img tag's src component. """
for (key, value) in dict(
BlindMonk = 'Lee Sin',
Leblanc = 'Leblanc',
Nocturne = 'Nocturne',
Irelia = 'Irelia',
Caitlyn = 'Caitlyn',
JarvanIV = 'Jarvan IV').iteritems():
if tag['src'].find(key) != -1:
return value
return re.search('small/(.*).png', tag['src']).group(1)
def insert_player(player_name, db):
db.execute('INSERT OR IGNORE INTO player (player_name) VALUES (?)',
(player_name,))
def player_key_from_name(player_name, db):
insert_player(player_name, db)
# Should be fast since it will be in the cache almost every time.
for row in db.execute('SELECT id FROM player WHERE player_name = ?',
(player_name,)):
return row['id']
return None
def match_key_from_name(match_name, db):
# Should be fast since it will be in the cache almost every time.
for row in db.execute('SELECT id FROM match WHERE match_id = ?',
(match_name,)):
return row['id']
return None
def insert_champion(name, db):
db.execute('INSERT OR IGNORE INTO champion (name) VALUES (?)', (name,))
def champion_id(name, db):
insert_champion(name, db)
for row in db.execute('SELECT id FROM champion WHERE name = ?', (name,)):
return row['id']
return None
def test_soup():
return BeautifulSoup(open('testfile', 'r').read())
def main(argv):
try:
argv = FLAGS(argv)
except gflags.FlagsError, e:
print('%s\\nUsage: %s ARGS\\n%s' % (e, sys.argv[0], FLAGS))
sys.exit(1)
make_db()
if __name__ == '__main__':
main(sys.argv)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment