Last active
April 14, 2019 20:12
-
-
Save alexshpilkin/388d74776714f7b9e590a8b39094bf93 to your computer and use it in GitHub Desktop.
Aggregate multiple CVK scrapes into precinct histories
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# NB Depends on dict preserving the insert order (CPython >= 3.6, PyPy all) | |
from csv import DictReader | |
from datetime import datetime as DateTime | |
from os import scandir | |
from simplejsonseq import dump | |
from sys import argv, stderr, stdout | |
import ietfcsv # ietf-tab CSV dialect | |
NAME = '%Y%m%dT%H%MZ.tsv' | |
HEAD = ('region №', 'region', 'ТВО №', 'Центр ТВО', '№ ВД') | |
def without(dict, *keys): | |
keys = set(keys) | |
return {k: v for k, v in dict.items() if k not in keys} | |
rows = {} # aggregated data | |
keys = [] # final row order | |
if len(argv) > 1 and argv[1] == '-v': | |
def trace(*args, **named): | |
print(*args, **named, file=stderr) | |
else: | |
def trace(*args, **named): | |
pass | |
for entry in sorted(scandir(), key=lambda e: e.name): | |
if not entry.name.endswith('.tsv'): continue | |
trace(entry.name, end=': ') | |
with open(entry.name, newline='\r\n') as tsv: | |
keys = [] | |
time = DateTime.strptime(entry.name, NAME) | |
prevlen, updated = len(rows), 0 | |
for line in DictReader(tsv, dialect='ietf-tab'): | |
del line['Кількість виборчих дільниць в окрузі'] | |
del line['Кількість виборчих дільниць щодо яких ' | |
'надійшли відомості'] | |
assert 'timestamp' not in line | |
line['timestamp'] = time.strftime('%Y-%m-%dT%H:%MZ') | |
row = {k: line.pop(k) for k in HEAD} | |
row['history'] = [line] | |
key = (row['ТВО №'], int(row['№ ВД'])) | |
keys.append(key) | |
row = rows.setdefault(key, row) | |
if (without(row['history'][-1], 'timestamp') != | |
without(line, 'timestamp')): | |
row['history'].append(line) | |
updated += 1 | |
trace('{} lines, {} inserted, {} updated' | |
.format(len(keys), len(rows)-prevlen, updated)) | |
if len(rows)-prevlen == 0 and updated == 0: | |
print('warning: {}: no changes'.format(entry.name), | |
file=stderr) | |
assert set(keys) <= set(rows) | |
if len(keys) < len(rows): | |
for k, row in rows.items(): | |
if k in keys: continue | |
print('warning: {}: ТВО {}, ВД {} missing' | |
.format(entry.name, | |
row['ТВО №'], | |
row['№ ВД']), | |
file=stderr) | |
for k, row in rows.items(): | |
# Not {r.pop('timestamp'): r for r ...} because CPython evaluates | |
# dictionary comprehensions in the wrong order (#29652) | |
row['history'] = {r['timestamp']: without(r, 'timestamp') | |
for r in row['history']} | |
stdout.reconfigure(newline='\r\n') | |
dump((rows[k] for k in keys), stdout, ensure_ascii=False, indent='\t') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from csv import reader as Reader, writer as Writer | |
from datetime import datetime as DateTime | |
from io import TextIOWrapper | |
from os import scandir | |
from sys import argv, stderr | |
from zipfile import ZipFile | |
import ietfcsv # ietf-tab-compat CSV dialect | |
ZIPNAME = 'voting_results_%Y_%m_%d_%H_%M_GMT.zip' | |
TSVNAME = '%Y%m%dT%H%MZ.tsv' | |
TVOCOL = 2 | |
VDCOL = 6 | |
if len(argv) > 1 and argv[1] == '-v': | |
def trace(*args, **named): | |
print(*args, **named, file=stderr) | |
else: | |
def trace(*args, **named): | |
pass | |
head = None | |
for entry in reversed(sorted(scandir(), key=lambda e: e.name)): | |
if not entry.name.endswith('.zip'): continue | |
trace(entry.name, end=' -> ') | |
time = DateTime.strptime(entry.name, ZIPNAME) | |
with ZipFile(entry.name) as archive, \ | |
TextIOWrapper(archive.open('precincts.txt')) as txt, \ | |
open(time.strftime(TSVNAME), 'w', newline='\r\n') as tsv: | |
trace(tsv.name) | |
rd = Reader(txt, delimiter='\t') | |
wr = Writer(tsv, dialect='ietf-tab-compat') | |
h = next(rd) | |
assert h[TVOCOL] == 'ТВО №' and h[VDCOL] == '№ ВД' | |
if head is None: | |
head = h | |
if head != h: | |
assert len(head) == len(h) | |
for col, c in zip(head, h): | |
if col == c: continue | |
print("warning: {}: renaming column '{}' " | |
"to '{}'".format(entry.name, c, col), | |
file=stderr) | |
wr.writerow(head) | |
for row in rd: | |
row[VDCOL], *comment = row[VDCOL].split('\n') | |
if comment: | |
assert len(comment) == 1 | |
print("warning: {}: comment on ТВО {}, ВД {} " | |
"ignored: '{}'".format(entry.name, | |
row[TVOCOL], row[VDCOL], comment[0]), | |
file=stderr) | |
# FIXME newline check in CPython's Modules/_csv.c:1034-6 is broken? | |
assert all(not val or '\n' not in val for val in row) | |
wr.writerow(row) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from csv import Dialect, excel, QUOTE_NONE, register_dialect | |
# https://tools.ietf.org/html/rfc4180 | |
class ietf(excel): | |
lineterminator = '\n' # use universal newlines | |
register_dialect('ietf', ietf) | |
# https://www.iana.org/assignments/media-types/text/tab-separated-values | |
class ietf_tab(Dialect): | |
delimiter = '\t' | |
lineterminator = '\n' # use universal newlines | |
quoting = QUOTE_NONE | |
register_dialect('ietf-tab', ietf_tab) | |
# ietf-tab with double quotes prohibited for compatibility | |
class ietf_tab_compat(ietf_tab): | |
quotechar = '"' | |
register_dialect('ietf-tab-compat', ietf_tab_compat) | |
# for full compliance, set newline='\r\n' in TextIOWrapper as well |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# Generates TSV if called as jsonseq2tsv and CSV otherwise | |
# NB Depends on dict preserving the insert order (CPython >= 3.6, PyPy all) | |
from csv import DictWriter | |
from simplejsonseq import load | |
from sys import argv, stdin, stdout | |
import ietfcsv # ietf and ietf-tab-compat CSV dialects | |
dialect = 'ietf-tab-compat' if 'tsv' in argv[0] else 'ietf' | |
stdout.reconfigure(newline='\r\n') | |
wr = None | |
for station in load(stdin): | |
if wr is None: | |
fields = list(station.keys()) | |
last = fields.pop() | |
assert last == 'history' | |
fields.extend(next(iter(station['history'].values())).keys()) | |
fields.append('timestamp') | |
wr = DictWriter(stdout, fieldnames=fields, dialect=dialect) | |
wr.writeheader() | |
for time, row in station.pop('history').items(): | |
row.update(station) | |
row['timestamp'] = time | |
wr.writerow(row) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# NB Depends on dict preserving the insert order (CPython >= 3.6, PyPy all) | |
from json import dump | |
from simplejsonseq import load | |
from sys import stdin, stdout | |
stdout.reconfigure(newline='\r\n') | |
for station in load(stdin): | |
dump(station, stdout, ensure_ascii=False) | |
stdout.write('\n') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment