Skip to content

Instantly share code, notes, and snippets.

@alexshpilkin
Last active Apr 14, 2019
Embed
What would you like to do?
Aggregate multiple CVK scrapes into precinct histories
#!/usr/bin/env python3
# NB Depends on dict preserving the insert order (CPython >= 3.6, PyPy all)
from csv import DictReader
from datetime import datetime as DateTime
from os import scandir
from simplejsonseq import dump
from sys import argv, stderr, stdout
import ietfcsv # ietf-tab CSV dialect
NAME = '%Y%m%dT%H%MZ.tsv'
HEAD = ('region №', 'region', 'ТВО №', 'Центр ТВО', '№ ВД')
def without(dict, *keys):
keys = set(keys)
return {k: v for k, v in dict.items() if k not in keys}
rows = {} # aggregated data
keys = [] # final row order
if len(argv) > 1 and argv[1] == '-v':
def trace(*args, **named):
print(*args, **named, file=stderr)
else:
def trace(*args, **named):
pass
for entry in sorted(scandir(), key=lambda e: e.name):
if not entry.name.endswith('.tsv'): continue
trace(entry.name, end=': ')
with open(entry.name, newline='\r\n') as tsv:
keys = []
time = DateTime.strptime(entry.name, NAME)
prevlen, updated = len(rows), 0
for line in DictReader(tsv, dialect='ietf-tab'):
del line['Кількість виборчих дільниць в окрузі']
del line['Кількість виборчих дільниць щодо яких '
'надійшли відомості']
assert 'timestamp' not in line
line['timestamp'] = time.strftime('%Y-%m-%dT%H:%MZ')
row = {k: line.pop(k) for k in HEAD}
row['history'] = [line]
key = (row['ТВО №'], int(row['№ ВД']))
keys.append(key)
row = rows.setdefault(key, row)
if (without(row['history'][-1], 'timestamp') !=
without(line, 'timestamp')):
row['history'].append(line)
updated += 1
trace('{} lines, {} inserted, {} updated'
.format(len(keys), len(rows)-prevlen, updated))
if len(rows)-prevlen == 0 and updated == 0:
print('warning: {}: no changes'.format(entry.name),
file=stderr)
assert set(keys) <= set(rows)
if len(keys) < len(rows):
for k, row in rows.items():
if k in keys: continue
print('warning: {}: ТВО {}, ВД {} missing'
.format(entry.name,
row['ТВО №'],
row['№ ВД']),
file=stderr)
for k, row in rows.items():
# Not {r.pop('timestamp'): r for r ...} because CPython evaluates
# dictionary comprehensions in the wrong order (#29652)
row['history'] = {r['timestamp']: without(r, 'timestamp')
for r in row['history']}
stdout.reconfigure(newline='\r\n')
dump((rows[k] for k in keys), stdout, ensure_ascii=False, indent='\t')
#!/usr/bin/env python3
from csv import reader as Reader, writer as Writer
from datetime import datetime as DateTime
from io import TextIOWrapper
from os import scandir
from sys import argv, stderr
from zipfile import ZipFile
import ietfcsv # ietf-tab-compat CSV dialect
ZIPNAME = 'voting_results_%Y_%m_%d_%H_%M_GMT.zip'
TSVNAME = '%Y%m%dT%H%MZ.tsv'
TVOCOL = 2
VDCOL = 6
if len(argv) > 1 and argv[1] == '-v':
def trace(*args, **named):
print(*args, **named, file=stderr)
else:
def trace(*args, **named):
pass
head = None
for entry in reversed(sorted(scandir(), key=lambda e: e.name)):
if not entry.name.endswith('.zip'): continue
trace(entry.name, end=' -> ')
time = DateTime.strptime(entry.name, ZIPNAME)
with ZipFile(entry.name) as archive, \
TextIOWrapper(archive.open('precincts.txt')) as txt, \
open(time.strftime(TSVNAME), 'w', newline='\r\n') as tsv:
trace(tsv.name)
rd = Reader(txt, delimiter='\t')
wr = Writer(tsv, dialect='ietf-tab-compat')
h = next(rd)
assert h[TVOCOL] == 'ТВО №' and h[VDCOL] == '№ ВД'
if head is None:
head = h
if head != h:
assert len(head) == len(h)
for col, c in zip(head, h):
if col == c: continue
print("warning: {}: renaming column '{}' "
"to '{}'".format(entry.name, c, col),
file=stderr)
wr.writerow(head)
for row in rd:
row[VDCOL], *comment = row[VDCOL].split('\n')
if comment:
assert len(comment) == 1
print("warning: {}: comment on ТВО {}, ВД {} "
"ignored: '{}'".format(entry.name,
row[TVOCOL], row[VDCOL], comment[0]),
file=stderr)
# FIXME newline check in CPython's Modules/_csv.c:1034-6 is broken?
assert all(not val or '\n' not in val for val in row)
wr.writerow(row)
from csv import Dialect, excel, QUOTE_NONE, register_dialect
# https://tools.ietf.org/html/rfc4180
class ietf(excel):
lineterminator = '\n' # use universal newlines
register_dialect('ietf', ietf)
# https://www.iana.org/assignments/media-types/text/tab-separated-values
class ietf_tab(Dialect):
delimiter = '\t'
lineterminator = '\n' # use universal newlines
quoting = QUOTE_NONE
register_dialect('ietf-tab', ietf_tab)
# ietf-tab with double quotes prohibited for compatibility
class ietf_tab_compat(ietf_tab):
quotechar = '"'
register_dialect('ietf-tab-compat', ietf_tab_compat)
# for full compliance, set newline='\r\n' in TextIOWrapper as well
#!/usr/bin/env python3
# Generates TSV if called as jsonseq2tsv and CSV otherwise
# NB Depends on dict preserving the insert order (CPython >= 3.6, PyPy all)
from csv import DictWriter
from simplejsonseq import load
from sys import argv, stdin, stdout
import ietfcsv # ietf and ietf-tab-compat CSV dialects
dialect = 'ietf-tab-compat' if 'tsv' in argv[0] else 'ietf'
stdout.reconfigure(newline='\r\n')
wr = None
for station in load(stdin):
if wr is None:
fields = list(station.keys())
last = fields.pop()
assert last == 'history'
fields.extend(next(iter(station['history'].values())).keys())
fields.append('timestamp')
wr = DictWriter(stdout, fieldnames=fields, dialect=dialect)
wr.writeheader()
for time, row in station.pop('history').items():
row.update(station)
row['timestamp'] = time
wr.writerow(row)
#!/usr/bin/env python3
# NB Depends on dict preserving the insert order (CPython >= 3.6, PyPy all)
from json import dump
from simplejsonseq import load
from sys import stdin, stdout
stdout.reconfigure(newline='\r\n')
for station in load(stdin):
dump(station, stdout, ensure_ascii=False)
stdout.write('\n')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment