Created February 28, 2012 20:40
Russian elections
#!/usr/bin/env python3
import sys
import time
from bs4 import BeautifulSoup
import urllib3
pool = urllib3.PoolManager(10)
# 2008 ROOT_URL = ''
# 2012 ROOT_URL = ''
# 2011 ROOT_URL = ''
# 2012
def retrieve(url):
resp = pool.request('GET', url)
return BeautifulSoup('cp1251'))
def get_tables(url):
page = retrieve(url)
sum_tag = page.find('b', text='Сумма')
if sum_tag is None:
return (None, None)
headers = sum_tag.find_parent('table')
content = headers.parent.find_next_sibling('td').table
return (headers, content)
def get_links(url):
headers, content = get_tables(url)
if content is None:
return []
return ((tag.text, tag['href']) for tag in content('a'))
def filter_digits(s):
return ''.join(ch for ch in s if ch.isdigit())
def save_uik_data(out_file):
print(' * * *')
print('Getting root list', file=sys.stderr)
for state, state_url in get_links(ROOT_URL):
print('Getting {0}'.format(state), file=sys.stderr)
for tik, tik_url in get_links(state_url):
print('Getting {0} / {1}'.format(state, tik), file=sys.stderr)
# real_tik_url = retrieve(tik_url).find('a', text='сайт избирательной комиссии субъекта Российской Федерации')['href']
# headers, content = get_tables(real_tik_url)
headers, content = get_tables(tik_url)
timestamp = time.time()
uiks = (filter_digits(tag.text) for tag in content('tr')[0].find_all('td'))
transpose = ((tag.text for tag in row.find_all('b')) for row in content('tr')[1:] if row.find('b') is not None)
for row in zip(uiks, *transpose):
print(timestamp, state, tik, *row, sep='\t', file=out_file)
def save_tik_data(out_file):
print(' * * *')
print('Getting root list', file=sys.stderr)
for state, state_url in get_links(ROOT_URL):
print('Getting {0}'.format(state), file=sys.stderr)
headers, content = get_tables(state_url)
timestamp = time.time()
tiks = (tag.text.strip() for tag in content('tr')[0].find_all('td'))
transpose = ((tag.text for tag in row.find_all('b')) for row in content('tr')[1:] if row.find('b') is not None)
for row in zip(tiks, *transpose):
print(timestamp, state, *row, sep='\t', file=out_file)
if __name__ == '__main__':
long_mode = len(sys.argv) > 1 and sys.argv[1] == '-l'
if long_mode:
save_data = save_uik_data
template = 'pres-2012-{0}.uiks.csv'
save_data = save_tik_data
template = 'pres-2012-{0}.tiks.csv'
while True:
start_time = int(time.time())
with open(template.format(start_time), 'w', encoding='utf-8') as out_file:
#!/usr/bin/env python3
# ---------------------------------------------------------------------------
# - Download geographical data for Russian elections
# Written by Alexander Shpilkin <>, March 2012
# To the extent possible under law, the author has dedicated all copyright
# and related and neighboring rights to this software to the public domain
# worldwide. This software is distributed without any warranty.
# See <> for the complete
# text of the CC0 Public Domain Dedication.
# ---------------------------------------------------------------------------
from urllib.request import urlopen
import json
def progress(fmt, *args, **named):
from sys import stdout
if hasattr(stdout, 'isatty') and stdout.isatty():
print('\r\033[2K' + fmt.format(*args, **named), end='')
print(fmt.format(*args, **named), end=('' if fmt.endswith('\n') else None))
URL_FORMAT = "{0}.json?24"
ROOT_URL = URL_FORMAT.format('districts')
OBJECT_URL = URL_FORMAT.format('{0}/{1}/{2}')
SEARCH_URL = URL_FORMAT.format('id_search/{0}/{1}')
OUT_FORMAT = "{0}.json"
OBJECT_TYPES = ['districts', 'areas', 'sub_areas', 'localities', 'streets',
'locations', 'voting_stations']
# Districts are intentionally omitted (they can't be downloaded the usual way)
OBJECT_REFS = {'area_id': 'areas', 'sub_area_id': 'sub_areas',
'locality_id': 'localities', 'dependent_locality_id': 'localities',
'street_id': 'streets', 'location_id': 'locations'}
def object_url(ty, ident):
assert ty in OBJECT_TYPES
return OBJECT_URL.format(ty, ident[:-2] if len(ident) > 2 else "0", ident)
def search_url(number):
number = str(number)
return SEARCH_URL.format(len(number), number)
def get_json(url):
with urlopen(url) as response:
return json.loads('utf-8'))
class Crawler(object):
def __init__(self):
self.objects = dict((ty, dict()) for ty in OBJECT_TYPES)
self.queue = list()
def touch(self, ty, ident):
if ident is None:
return None
ident = str(ident)
if ident not in self.objects[ty]:
self.queue.append((ty, ident))
self.objects[ty][ident] = {'id': ident}
return ident
def update(self, ty, obj):
# Normalize ---------------------------------------------------------
obj['id'] = self.touch(ty, obj['id'])
obj['type'] = ty
# Crawl -------------------------------------------------------------
if 'near_stations' in obj and obj['near_stations'] is not None:
obj['near_stations'] = [self.update('voting_stations', st)
for st in obj['near_stations']]
if 'location_id' in obj and 'location' in obj:
loc = obj.pop('location')
loc['id'] = obj['location_id']
obj['location_id'] = self.update('locations', loc)
for refname, refty in OBJECT_REFS.items():
if refname not in obj:
obj[refname] = self.touch(refty, obj[refname])
for childty in OBJECT_TYPES:
if childty not in obj:
obj[childty] = [self.update(childty, child)
for child in obj[childty]]
# Update the database -----------------------------------------------
stored = self.objects[ty][obj['id']]
for key, value in obj.items():
if stored.get(key, value) != value:
if '__conflict__' not in stored:
stored['__conflict__'] = list()
stored['__conflict__'].append((key, stored[key], value))
stored[key] = value
return obj['id']
def crawl(self):
i = 0
while len(self.queue) > 0:
progress("[*] {0} / {1} objects processed", i, i + len(self.queue))
ty, ident = self.queue.pop()
self.update(ty, get_json(object_url(ty, ident)))
except Exception as e:
self.objects[ty][ident]['__error__'] = str(e)
progress("[-] Error processing {0}/{1}: {2}\n", ty, ident, e)
i += 1
progress("[+] {0} objects processed\n", i)
def download():
data = Crawler()
progress("[*] Processing districts")
districts = get_json(ROOT_URL)
for obj in districts:
data.objects['districts'][obj['id']] = {}
data.update('districts', obj)
progress("[+] {0} districts processed\n", len(districts))
count = 0
for number in range(NUMBER_BOUND):
progress("[*] {0} / {1} station numbers processed : {2} stations",
number, NUMBER_BOUND, count)
results = get_json(search_url(number))
except Exception as e:
for station in results:
data.update('voting_stations', station)
count += len(results)
progress("[+] {0} station numbers processed : {1} stations\n",
for ty in OBJECT_TYPES:
progress("[*] Saving {0} {1}", len(data.objects[ty]), ty)
with open(OUT_FORMAT.format(ty), 'w') as output:
json.dump(data.objects[ty], output)
progress("--- {0} {1}\n", len(data.objects[ty]), ty)
if __name__ == '__main__':
#!/usr/bin/env python3
# ---------------------------------------------------------------------------
# - Generate a text report from geographical data
# Written by Alexander Shpilkin <>, March 2012
# To the extent possible under law, the author has dedicated all copyright
# and related and neighboring rights to this software to the public domain
# worldwide. This software is distributed without any warranty.
# See <> for the complete
# text of the CC0 Public Domain Dedication.
# ---------------------------------------------------------------------------
import sys
import json
def message(msg, *args, **named):
print(msg.format(*args, **named), end='', file=sys.stderr)
TYPES = ['districts', 'areas', 'sub_areas', 'localities', 'locations',
message('[*] Loading: ')
for ty in TYPES:
message('{0}, ', ty)
with open(ty + '.json', 'r', encoding='utf-8') as data_file:
locals()[ty] = json.load(data_file)
def generate():
bugs = []
for vid, vdata in voting_stations.items():
row = [vid]
loc = locations.get(vdata.get('location_id'))
if loc is None:
area = areas.get(loc['area_id'])
row.append(area['name'] if area is not None else '')
sub = sub_areas.get(loc['sub_area_id'])
row.append(sub['name'] if sub is not None else '')
lty = localities.get(loc['locality_id'])
row.append(lty['name'] if lty is not None else '')
row.append(1 if vdata['is_standalone'] else 0)
yield row
if len(bugs) > 0:
message('(warning: {0} inconsistent locations)', len(bugs))
if __name__ == '__main__':
with open('stations.csv', 'w', encoding='utf-8') as output:
message('[*] Writing')
for index, row in enumerate(generate()):
print(*row, sep='\t', file=output)
if index % 1000 == 0:
message(' done.\n')
I don't know what is happening, but I just used your script to download the up to date data, and 500,000 votes were missing. Do you have any clue of why it is happening?
Thank you very much!

