Skip to content

Instantly share code, notes, and snippets.

@alexshpilkin
Created February 28, 2012 20:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save alexshpilkin/1934969 to your computer and use it in GitHub Desktop.
Save alexshpilkin/1934969 to your computer and use it in GitHub Desktop.
Russian elections
#!/usr/bin/env python3
import sys
import time
from bs4 import BeautifulSoup
import urllib3
pool = urllib3.PoolManager(10)
# 2008 ROOT_URL = 'http://www.vybory.izbirkom.ru/region/region/izbirkom?action=show&root=1&tvd=100100022249920&vrn=100100022176412&region=0&global=1&sub_region=0&prver=0&pronetvd=null&vibid=100100022249920&type=227'
# 2012 ROOT_URL = 'http://www.vybory.izbirkom.ru/region/region/izbirkom?action=show&root=1&tvd=100100031793509&vrn=100100031793505&region=0&global=1&sub_region=0&prver=0&pronetvd=null&vibid=100100031793509&type=227'
# 2011 ROOT_URL = 'http://www.vybory.izbirkom.ru/region/region/izbirkom?action=show&root=1&tvd=100100028713304&vrn=100100028713299&region=0&global=1&sub_region=0&prver=0&pronetvd=null&vibid=100100028713304&type=233'
# 2012
ROOT_URL = 'http://www.vybory.izbirkom.ru/region/region/izbirkom?action=show&root=1&tvd=100100031793509&vrn=100100031793505&region=0&global=1&sub_region=0&prver=0&pronetvd=null&vibid=100100031793509&type=227'
def retrieve(url):
resp = pool.request('GET', url)
return BeautifulSoup(resp.data.decode('cp1251'))
def get_tables(url):
page = retrieve(url)
sum_tag = page.find('b', text='Сумма')
if sum_tag is None:
return (None, None)
headers = sum_tag.find_parent('table')
content = headers.parent.find_next_sibling('td').table
return (headers, content)
def get_links(url):
headers, content = get_tables(url)
if content is None:
return []
else:
return ((tag.text, tag['href']) for tag in content('a'))
def filter_digits(s):
return ''.join(ch for ch in s if ch.isdigit())
def save_uik_data(out_file):
print(' * * *')
print('Getting root list', file=sys.stderr)
for state, state_url in get_links(ROOT_URL):
print('Getting {0}'.format(state), file=sys.stderr)
for tik, tik_url in get_links(state_url):
print('Getting {0} / {1}'.format(state, tik), file=sys.stderr)
# real_tik_url = retrieve(tik_url).find('a', text='сайт избирательной комиссии субъекта Российской Федерации')['href']
# headers, content = get_tables(real_tik_url)
headers, content = get_tables(tik_url)
timestamp = time.time()
uiks = (filter_digits(tag.text) for tag in content('tr')[0].find_all('td'))
transpose = ((tag.text for tag in row.find_all('b')) for row in content('tr')[1:] if row.find('b') is not None)
for row in zip(uiks, *transpose):
print(timestamp, state, tik, *row, sep='\t', file=out_file)
def save_tik_data(out_file):
print(' * * *')
print('Getting root list', file=sys.stderr)
for state, state_url in get_links(ROOT_URL):
print('Getting {0}'.format(state), file=sys.stderr)
headers, content = get_tables(state_url)
timestamp = time.time()
tiks = (tag.text.strip() for tag in content('tr')[0].find_all('td'))
transpose = ((tag.text for tag in row.find_all('b')) for row in content('tr')[1:] if row.find('b') is not None)
for row in zip(tiks, *transpose):
print(timestamp, state, *row, sep='\t', file=out_file)
if __name__ == '__main__':
long_mode = len(sys.argv) > 1 and sys.argv[1] == '-l'
if long_mode:
save_data = save_uik_data
template = 'pres-2012-{0}.uiks.csv'
else:
save_data = save_tik_data
template = 'pres-2012-{0}.tiks.csv'
while True:
start_time = int(time.time())
with open(template.format(start_time), 'w', encoding='utf-8') as out_file:
save_data(out_file)
#!/usr/bin/env python3
# ---------------------------------------------------------------------------
# crawl.py - Download geographical data for Russian elections
#
# Written by Alexander Shpilkin <ashpilkin@gmail.com>, March 2012
#
# To the extent possible under law, the author has dedicated all copyright
# and related and neighboring rights to this software to the public domain
# worldwide. This software is distributed without any warranty.
#
# See <http://creativecommons.org/publicdomain/zero/1.0/> for the complete
# text of the CC0 Public Domain Dedication.
# ---------------------------------------------------------------------------
from urllib.request import urlopen
import json
def progress(fmt, *args, **named):
from sys import stdout
if hasattr(stdout, 'isatty') and stdout.isatty():
print('\r\033[2K' + fmt.format(*args, **named), end='')
else:
print(fmt.format(*args, **named), end=('' if fmt.endswith('\n') else None))
stdout.flush()
URL_FORMAT = "http://webvybory2012.ru/json/{0}.json?24"
ROOT_URL = URL_FORMAT.format('districts')
OBJECT_URL = URL_FORMAT.format('{0}/{1}/{2}')
SEARCH_URL = URL_FORMAT.format('id_search/{0}/{1}')
NUMBER_BOUND = 6000
OUT_FORMAT = "{0}.json"
OBJECT_TYPES = ['districts', 'areas', 'sub_areas', 'localities', 'streets',
'locations', 'voting_stations']
# Districts are intentionally omitted (they can't be downloaded the usual way)
OBJECT_REFS = {'area_id': 'areas', 'sub_area_id': 'sub_areas',
'locality_id': 'localities', 'dependent_locality_id': 'localities',
'street_id': 'streets', 'location_id': 'locations'}
def object_url(ty, ident):
assert ty in OBJECT_TYPES
return OBJECT_URL.format(ty, ident[:-2] if len(ident) > 2 else "0", ident)
def search_url(number):
number = str(number)
return SEARCH_URL.format(len(number), number)
def get_json(url):
with urlopen(url) as response:
return json.loads(response.read().decode('utf-8'))
class Crawler(object):
def __init__(self):
self.objects = dict((ty, dict()) for ty in OBJECT_TYPES)
self.queue = list()
def touch(self, ty, ident):
if ident is None:
return None
ident = str(ident)
if ident not in self.objects[ty]:
self.queue.append((ty, ident))
self.objects[ty][ident] = {'id': ident}
return ident
def update(self, ty, obj):
# Normalize ---------------------------------------------------------
obj['id'] = self.touch(ty, obj['id'])
obj['type'] = ty
# Crawl -------------------------------------------------------------
if 'near_stations' in obj and obj['near_stations'] is not None:
obj['near_stations'] = [self.update('voting_stations', st)
for st in obj['near_stations']]
if 'location_id' in obj and 'location' in obj:
loc = obj.pop('location')
loc['id'] = obj['location_id']
obj['location_id'] = self.update('locations', loc)
for refname, refty in OBJECT_REFS.items():
if refname not in obj:
continue
obj[refname] = self.touch(refty, obj[refname])
for childty in OBJECT_TYPES:
if childty not in obj:
continue
obj[childty] = [self.update(childty, child)
for child in obj[childty]]
# Update the database -----------------------------------------------
stored = self.objects[ty][obj['id']]
for key, value in obj.items():
if stored.get(key, value) != value:
if '__conflict__' not in stored:
stored['__conflict__'] = list()
stored['__conflict__'].append((key, stored[key], value))
stored[key] = value
return obj['id']
def crawl(self):
i = 0
while len(self.queue) > 0:
progress("[*] {0} / {1} objects processed", i, i + len(self.queue))
ty, ident = self.queue.pop()
try:
self.update(ty, get_json(object_url(ty, ident)))
except Exception as e:
self.objects[ty][ident]['__error__'] = str(e)
progress("[-] Error processing {0}/{1}: {2}\n", ty, ident, e)
i += 1
progress("[+] {0} objects processed\n", i)
def download():
data = Crawler()
progress("[*] Processing districts")
districts = get_json(ROOT_URL)
for obj in districts:
data.objects['districts'][obj['id']] = {}
data.update('districts', obj)
progress("[+] {0} districts processed\n", len(districts))
count = 0
for number in range(NUMBER_BOUND):
progress("[*] {0} / {1} station numbers processed : {2} stations",
number, NUMBER_BOUND, count)
try:
results = get_json(search_url(number))
except Exception as e:
pass
else:
for station in results:
data.update('voting_stations', station)
count += len(results)
progress("[+] {0} station numbers processed : {1} stations\n",
NUMBER_BOUND, count)
data.crawl()
for ty in OBJECT_TYPES:
progress("[*] Saving {0} {1}", len(data.objects[ty]), ty)
with open(OUT_FORMAT.format(ty), 'w') as output:
json.dump(data.objects[ty], output)
progress("--- {0} {1}\n", len(data.objects[ty]), ty)
if __name__ == '__main__':
download()
#!/usr/bin/env python3
# ---------------------------------------------------------------------------
# report.py - Generate a text report from geographical data
#
# Written by Alexander Shpilkin <ashpilkin@gmail.com>, March 2012
#
# To the extent possible under law, the author has dedicated all copyright
# and related and neighboring rights to this software to the public domain
# worldwide. This software is distributed without any warranty.
#
# See <http://creativecommons.org/publicdomain/zero/1.0/> for the complete
# text of the CC0 Public Domain Dedication.
# ---------------------------------------------------------------------------
import sys
import json
def message(msg, *args, **named):
print(msg.format(*args, **named), end='', file=sys.stderr)
sys.stderr.flush()
TYPES = ['districts', 'areas', 'sub_areas', 'localities', 'locations',
'voting_stations']
message('[*] Loading: ')
for ty in TYPES:
message('{0}, ', ty)
with open(ty + '.json', 'r', encoding='utf-8') as data_file:
locals()[ty] = json.load(data_file)
message('done.\n')
def generate():
bugs = []
for vid, vdata in voting_stations.items():
row = [vid]
loc = locations.get(vdata.get('location_id'))
if loc is None:
bugs.append(vid)
continue
area = areas.get(loc['area_id'])
row.append(area['name'] if area is not None else '')
sub = sub_areas.get(loc['sub_area_id'])
row.append(sub['name'] if sub is not None else '')
lty = localities.get(loc['locality_id'])
row.append(lty['name'] if lty is not None else '')
row.append(vdata['name'])
row.append(vdata['address'])
row.append(loc['lat'])
row.append(loc['lon'])
row.append(1 if vdata['is_standalone'] else 0)
yield row
if len(bugs) > 0:
message('(warning: {0} inconsistent locations)', len(bugs))
if __name__ == '__main__':
with open('stations.csv', 'w', encoding='utf-8') as output:
message('[*] Writing')
for index, row in enumerate(generate()):
print(*row, sep='\t', file=output)
if index % 1000 == 0:
message('.')
message(' done.\n')
@matpestana
Copy link

I don't know what is happening, but I just used your script cikrf.py to download the up to date data, and 500,000 votes were missing. Do you have any clue of why it is happening?
Thank you very much!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment