Created
February 28, 2012 20:40
-
-
Save alexshpilkin/1934969 to your computer and use it in GitHub Desktop.
Russian elections
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import sys | |
import time | |
from bs4 import BeautifulSoup | |
import urllib3 | |
pool = urllib3.PoolManager(10) | |
# 2008 ROOT_URL = 'http://www.vybory.izbirkom.ru/region/region/izbirkom?action=show&root=1&tvd=100100022249920&vrn=100100022176412®ion=0&global=1&sub_region=0&prver=0&pronetvd=null&vibid=100100022249920&type=227' | |
# 2012 ROOT_URL = 'http://www.vybory.izbirkom.ru/region/region/izbirkom?action=show&root=1&tvd=100100031793509&vrn=100100031793505®ion=0&global=1&sub_region=0&prver=0&pronetvd=null&vibid=100100031793509&type=227' | |
# 2011 ROOT_URL = 'http://www.vybory.izbirkom.ru/region/region/izbirkom?action=show&root=1&tvd=100100028713304&vrn=100100028713299®ion=0&global=1&sub_region=0&prver=0&pronetvd=null&vibid=100100028713304&type=233' | |
# 2012 | |
ROOT_URL = 'http://www.vybory.izbirkom.ru/region/region/izbirkom?action=show&root=1&tvd=100100031793509&vrn=100100031793505®ion=0&global=1&sub_region=0&prver=0&pronetvd=null&vibid=100100031793509&type=227' | |
def retrieve(url): | |
resp = pool.request('GET', url) | |
return BeautifulSoup(resp.data.decode('cp1251')) | |
def get_tables(url): | |
page = retrieve(url) | |
sum_tag = page.find('b', text='Сумма') | |
if sum_tag is None: | |
return (None, None) | |
headers = sum_tag.find_parent('table') | |
content = headers.parent.find_next_sibling('td').table | |
return (headers, content) | |
def get_links(url): | |
headers, content = get_tables(url) | |
if content is None: | |
return [] | |
else: | |
return ((tag.text, tag['href']) for tag in content('a')) | |
def filter_digits(s): | |
return ''.join(ch for ch in s if ch.isdigit()) | |
def save_uik_data(out_file): | |
print(' * * *') | |
print('Getting root list', file=sys.stderr) | |
for state, state_url in get_links(ROOT_URL): | |
print('Getting {0}'.format(state), file=sys.stderr) | |
for tik, tik_url in get_links(state_url): | |
print('Getting {0} / {1}'.format(state, tik), file=sys.stderr) | |
# real_tik_url = retrieve(tik_url).find('a', text='сайт избирательной комиссии субъекта Российской Федерации')['href'] | |
# headers, content = get_tables(real_tik_url) | |
headers, content = get_tables(tik_url) | |
timestamp = time.time() | |
uiks = (filter_digits(tag.text) for tag in content('tr')[0].find_all('td')) | |
transpose = ((tag.text for tag in row.find_all('b')) for row in content('tr')[1:] if row.find('b') is not None) | |
for row in zip(uiks, *transpose): | |
print(timestamp, state, tik, *row, sep='\t', file=out_file) | |
def save_tik_data(out_file): | |
print(' * * *') | |
print('Getting root list', file=sys.stderr) | |
for state, state_url in get_links(ROOT_URL): | |
print('Getting {0}'.format(state), file=sys.stderr) | |
headers, content = get_tables(state_url) | |
timestamp = time.time() | |
tiks = (tag.text.strip() for tag in content('tr')[0].find_all('td')) | |
transpose = ((tag.text for tag in row.find_all('b')) for row in content('tr')[1:] if row.find('b') is not None) | |
for row in zip(tiks, *transpose): | |
print(timestamp, state, *row, sep='\t', file=out_file) | |
if __name__ == '__main__': | |
long_mode = len(sys.argv) > 1 and sys.argv[1] == '-l' | |
if long_mode: | |
save_data = save_uik_data | |
template = 'pres-2012-{0}.uiks.csv' | |
else: | |
save_data = save_tik_data | |
template = 'pres-2012-{0}.tiks.csv' | |
while True: | |
start_time = int(time.time()) | |
with open(template.format(start_time), 'w', encoding='utf-8') as out_file: | |
save_data(out_file) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# --------------------------------------------------------------------------- | |
# crawl.py - Download geographical data for Russian elections | |
# | |
# Written by Alexander Shpilkin <ashpilkin@gmail.com>, March 2012 | |
# | |
# To the extent possible under law, the author has dedicated all copyright | |
# and related and neighboring rights to this software to the public domain | |
# worldwide. This software is distributed without any warranty. | |
# | |
# See <http://creativecommons.org/publicdomain/zero/1.0/> for the complete | |
# text of the CC0 Public Domain Dedication. | |
# --------------------------------------------------------------------------- | |
from urllib.request import urlopen | |
import json | |
def progress(fmt, *args, **named): | |
from sys import stdout | |
if hasattr(stdout, 'isatty') and stdout.isatty(): | |
print('\r\033[2K' + fmt.format(*args, **named), end='') | |
else: | |
print(fmt.format(*args, **named), end=('' if fmt.endswith('\n') else None)) | |
stdout.flush() | |
URL_FORMAT = "http://webvybory2012.ru/json/{0}.json?24" | |
ROOT_URL = URL_FORMAT.format('districts') | |
OBJECT_URL = URL_FORMAT.format('{0}/{1}/{2}') | |
SEARCH_URL = URL_FORMAT.format('id_search/{0}/{1}') | |
NUMBER_BOUND = 6000 | |
OUT_FORMAT = "{0}.json" | |
OBJECT_TYPES = ['districts', 'areas', 'sub_areas', 'localities', 'streets', | |
'locations', 'voting_stations'] | |
# Districts are intentionally omitted (they can't be downloaded the usual way) | |
OBJECT_REFS = {'area_id': 'areas', 'sub_area_id': 'sub_areas', | |
'locality_id': 'localities', 'dependent_locality_id': 'localities', | |
'street_id': 'streets', 'location_id': 'locations'} | |
def object_url(ty, ident): | |
assert ty in OBJECT_TYPES | |
return OBJECT_URL.format(ty, ident[:-2] if len(ident) > 2 else "0", ident) | |
def search_url(number): | |
number = str(number) | |
return SEARCH_URL.format(len(number), number) | |
def get_json(url): | |
with urlopen(url) as response: | |
return json.loads(response.read().decode('utf-8')) | |
class Crawler(object): | |
def __init__(self): | |
self.objects = dict((ty, dict()) for ty in OBJECT_TYPES) | |
self.queue = list() | |
def touch(self, ty, ident): | |
if ident is None: | |
return None | |
ident = str(ident) | |
if ident not in self.objects[ty]: | |
self.queue.append((ty, ident)) | |
self.objects[ty][ident] = {'id': ident} | |
return ident | |
def update(self, ty, obj): | |
# Normalize --------------------------------------------------------- | |
obj['id'] = self.touch(ty, obj['id']) | |
obj['type'] = ty | |
# Crawl ------------------------------------------------------------- | |
if 'near_stations' in obj and obj['near_stations'] is not None: | |
obj['near_stations'] = [self.update('voting_stations', st) | |
for st in obj['near_stations']] | |
if 'location_id' in obj and 'location' in obj: | |
loc = obj.pop('location') | |
loc['id'] = obj['location_id'] | |
obj['location_id'] = self.update('locations', loc) | |
for refname, refty in OBJECT_REFS.items(): | |
if refname not in obj: | |
continue | |
obj[refname] = self.touch(refty, obj[refname]) | |
for childty in OBJECT_TYPES: | |
if childty not in obj: | |
continue | |
obj[childty] = [self.update(childty, child) | |
for child in obj[childty]] | |
# Update the database ----------------------------------------------- | |
stored = self.objects[ty][obj['id']] | |
for key, value in obj.items(): | |
if stored.get(key, value) != value: | |
if '__conflict__' not in stored: | |
stored['__conflict__'] = list() | |
stored['__conflict__'].append((key, stored[key], value)) | |
stored[key] = value | |
return obj['id'] | |
def crawl(self): | |
i = 0 | |
while len(self.queue) > 0: | |
progress("[*] {0} / {1} objects processed", i, i + len(self.queue)) | |
ty, ident = self.queue.pop() | |
try: | |
self.update(ty, get_json(object_url(ty, ident))) | |
except Exception as e: | |
self.objects[ty][ident]['__error__'] = str(e) | |
progress("[-] Error processing {0}/{1}: {2}\n", ty, ident, e) | |
i += 1 | |
progress("[+] {0} objects processed\n", i) | |
def download(): | |
data = Crawler() | |
progress("[*] Processing districts") | |
districts = get_json(ROOT_URL) | |
for obj in districts: | |
data.objects['districts'][obj['id']] = {} | |
data.update('districts', obj) | |
progress("[+] {0} districts processed\n", len(districts)) | |
count = 0 | |
for number in range(NUMBER_BOUND): | |
progress("[*] {0} / {1} station numbers processed : {2} stations", | |
number, NUMBER_BOUND, count) | |
try: | |
results = get_json(search_url(number)) | |
except Exception as e: | |
pass | |
else: | |
for station in results: | |
data.update('voting_stations', station) | |
count += len(results) | |
progress("[+] {0} station numbers processed : {1} stations\n", | |
NUMBER_BOUND, count) | |
data.crawl() | |
for ty in OBJECT_TYPES: | |
progress("[*] Saving {0} {1}", len(data.objects[ty]), ty) | |
with open(OUT_FORMAT.format(ty), 'w') as output: | |
json.dump(data.objects[ty], output) | |
progress("--- {0} {1}\n", len(data.objects[ty]), ty) | |
if __name__ == '__main__': | |
download() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# --------------------------------------------------------------------------- | |
# report.py - Generate a text report from geographical data | |
# | |
# Written by Alexander Shpilkin <ashpilkin@gmail.com>, March 2012 | |
# | |
# To the extent possible under law, the author has dedicated all copyright | |
# and related and neighboring rights to this software to the public domain | |
# worldwide. This software is distributed without any warranty. | |
# | |
# See <http://creativecommons.org/publicdomain/zero/1.0/> for the complete | |
# text of the CC0 Public Domain Dedication. | |
# --------------------------------------------------------------------------- | |
import sys | |
import json | |
def message(msg, *args, **named): | |
print(msg.format(*args, **named), end='', file=sys.stderr) | |
sys.stderr.flush() | |
TYPES = ['districts', 'areas', 'sub_areas', 'localities', 'locations', | |
'voting_stations'] | |
message('[*] Loading: ') | |
for ty in TYPES: | |
message('{0}, ', ty) | |
with open(ty + '.json', 'r', encoding='utf-8') as data_file: | |
locals()[ty] = json.load(data_file) | |
message('done.\n') | |
def generate(): | |
bugs = [] | |
for vid, vdata in voting_stations.items(): | |
row = [vid] | |
loc = locations.get(vdata.get('location_id')) | |
if loc is None: | |
bugs.append(vid) | |
continue | |
area = areas.get(loc['area_id']) | |
row.append(area['name'] if area is not None else '') | |
sub = sub_areas.get(loc['sub_area_id']) | |
row.append(sub['name'] if sub is not None else '') | |
lty = localities.get(loc['locality_id']) | |
row.append(lty['name'] if lty is not None else '') | |
row.append(vdata['name']) | |
row.append(vdata['address']) | |
row.append(loc['lat']) | |
row.append(loc['lon']) | |
row.append(1 if vdata['is_standalone'] else 0) | |
yield row | |
if len(bugs) > 0: | |
message('(warning: {0} inconsistent locations)', len(bugs)) | |
if __name__ == '__main__': | |
with open('stations.csv', 'w', encoding='utf-8') as output: | |
message('[*] Writing') | |
for index, row in enumerate(generate()): | |
print(*row, sep='\t', file=output) | |
if index % 1000 == 0: | |
message('.') | |
message(' done.\n') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I don't know what is happening, but I just used your script cikrf.py to download the up to date data, and 500,000 votes were missing. Do you have any clue of why it is happening?
Thank you very much!