Skip to content

Instantly share code, notes, and snippets.

@ketralnis
Created July 12, 2011 06:25
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ketralnis/1077499 to your computer and use it in GitHub Desktop.
Save ketralnis/1077499 to your computer and use it in GitHub Desktop.
#!/Users/dking/src/pypy/pypy/translator/goal/pypy-c
#!/usr/bin/env python2.7
import sys
import math
import bisect
import sqlite3
import os.path
import itertools
from datetime import datetime
from hashlib import md5
from collections import namedtuple, defaultdict
# note: many of the following library functions are ripped right out of reddit's
# source code https://github.com/reddit/reddit/
def in_chunks(it, size=25):
chunk = []
it = iter(it)
try:
while True:
chunk.append(it.next())
if len(chunk) >= size:
yield chunk
chunk = []
except StopIteration:
if chunk:
yield chunk
def to_base(q, alphabet):
if q < 0: raise ValueError, "must supply a positive integer"
l = len(alphabet)
converted = []
while q != 0:
q, r = divmod(q, l)
converted.insert(0, alphabet[r])
return "".join(converted) or '0'
def to36(q):
return to_base(q, '0123456789abcdefghijklmnopqrstuvwxyz')
class emitter(object):
def __init__(self, f):
self.fname = f
self.data = []
def emit(self, *s):
self.data.append(s)
def close(self):
with open(self.fname, 'w') as f:
for d in self.data:
f.write(','.join(map(str, d)))
f.write('\n')
class emit(object):
def __init__(self, fname):
self.fname = fname
def __enter__(self):
self.emitter = emitter(self.fname)
return self.emitter
def __exit__(self, type, value, traceback):
self.emitter.close()
del self.emitter
def hashdir(prefix, fname, postfix, l=2):
fname = str(fname)
h = md5(fname).hexdigest()
dname = os.path.join(prefix, h[:l])
name = os.path.join(dname, fname+postfix)
if not os.path.isdir(dname):
os.makedirs(dname)
return name
def pow2(score):
if score <= 0:
return -1
elif score < 2:
return 0
else:
return 2 ** int(math.log(score, 2))
def epoch_seconds(date):
"""Returns the number of seconds from the epoch to date. Should
match the number returned by the equivalent function in
postgres."""
td = date - epoch
return td.days * 86400 + td.seconds + (float(td.microseconds) / 1000000)
def score(ups, downs):
return ups - downs
def hot(ups, downs, date):
return _hot(ups, downs, epoch_seconds(date))
def _hot(ups, downs, date):
"""The hot formula. Should match the equivalent function in postgres."""
s = score(ups, downs)
order = math.log10(max(abs(s), 1))
if s > 0:
sign = 1
elif s < 0:
sign = -1
else:
sign = 0
seconds = date - 1134028003
return round(order + sign * seconds / 45000, 7)
def progress(it, verbosity=100, key=repr, estimate=None, persec=True):
"""An iterator that yields everything from `it', but prints progress
information along the way, including time-estimates if
possible"""
# stolen wholesale from reddit's utils.py
from itertools import islice
from datetime import datetime
from time import time
import sys
now = start = datetime.now()
elapsed = start - start
# try to guess at the estimate if we can
if estimate is None:
try:
estimate = len(it)
except:
pass
def timedelta_to_seconds(td):
return td.days * (24*60*60) + td.seconds + (float(td.microseconds) / 1000000)
def format_timedelta(td, sep=''):
ret = []
s = timedelta_to_seconds(td)
if s < 0:
neg = True
s *= -1
else:
neg = False
if s >= (24*60*60):
days = int(s//(24*60*60))
ret.append('%dd' % days)
s -= days*(24*60*60)
if s >= 60*60:
hours = int(s//(60*60))
ret.append('%dh' % hours)
s -= hours*(60*60)
if s >= 60:
minutes = int(s//60)
ret.append('%dm' % minutes)
s -= minutes*60
if s >= 1:
seconds = int(s)
ret.append('%ds' % seconds)
s -= seconds
if not ret:
return '0s'
return ('-' if neg else '') + sep.join(ret)
def format_datetime(dt, show_date=False):
if show_date:
return dt.strftime('%Y-%m-%d %H:%M')
else:
return dt.strftime('%H:%M:%S')
def deq(dt1, dt2):
"Indicates whether the two datetimes' dates describe the same (day,month,year)"
d1, d2 = dt1.date(), dt2.date()
return ( d1.day == d2.day
and d1.month == d2.month
and d1.year == d2.year)
sys.stderr.write('Starting at %s\n' % (start,))
# we're going to islice it so we need to start an iterator
it = iter(it)
seen = 0
while True:
this_chunk = 0
thischunk_started = time()
# the simple bit: just iterate and yield
while (this_chunk < verbosity
# don't print status reports more than once per second (this
# makes picking a verbosity that's too low a little less dicey)
or time() - thischunk_started < 1
):
try:
item = it.next()
this_chunk += 1
seen += 1
yield item
except StopIteration:
break
if this_chunk < verbosity:
# we're done, the iterator is empty
break
now = datetime.now()
elapsed = now - start
thischunk_seconds = time() - thischunk_started
if estimate:
# the estimate is based on the total number of items that we've
# processed in the total amount of time that's passed, so it should
# smooth over momentary spikes in speed (but will take a while to
# adjust to long-term changes in speed). TODO: we could take the
# mean of the estimate based on all time and the estimate based on
# the most recent N periods?
remaining = ((elapsed/seen)*estimate)-elapsed
completion = now + remaining
count_str = ('%d/%d %.2f%%'
% (seen, estimate, float(seen)/estimate*100))
completion_str = format_datetime(completion, not deq(completion,now))
estimate_str = (' (%s remaining; completion %s)'
% (format_timedelta(remaining),
completion_str))
else:
count_str = '%d' % seen
estimate_str = ''
if key:
key_str = ': %s' % key(item)
else:
key_str = ''
# unlike the estimate, the persec count is the number per
# second for *this* batch only, without smoothing
if persec and thischunk_seconds > 0:
persec_str = ' (%.1f/s)' % (float(this_chunk)/thischunk_seconds,)
else:
persec_str = ''
sys.stderr.write('%s%s, %s%s%s\n'
% (count_str, persec_str,
format_timedelta(elapsed), estimate_str, key_str))
now = datetime.now()
elapsed = now - start
elapsed_seconds = timedelta_to_seconds(elapsed)
if persec and seen > 0 and elapsed_seconds > 0:
persec_str = ' (@%.1f/sec)' % (float(seen)/elapsed_seconds)
else:
persec_str = ''
sys.stderr.write('Processed %d%s items in %s..%s (%s)\n'
% (seen,
persec_str,
format_datetime(start, not deq(start, now)),
format_datetime(now, not deq(start, now)),
format_timedelta(elapsed)))
# Votes: 6064280
# Links: 228664
# Max v/l: 20857
# Avg v/l: 27
def _l(l):
return l if isinstance(l, list) else list(l)
class Link(object):
__slots__ = ['linkid', 'ups', 'downs', 'timestamp']
def __init__(self, linkid, ups, downs, timestamp):
self.linkid = linkid
self.ups = ups
self.downs = downs
self.timestamp = timestamp
@property
def score(self):
return self.ups - self.downs
@property
def hot(self):
return _hot(self.ups, self.downs, self.timestamp)
def copy(self):
return Link(self.linkid, self.ups, self.downs, self.timestamp)
def report(slname):
sl = sqlite3.connect(slname)
# I just know how many of these there are
numvotes = 5362884
print 'Finding mix/max of votes'
minvote, maxvote = sl.execute("select min(timestamp), max(timestamp) from votes").fetchone()
print 'Min: %s, Max: %s (%.2fs)' % (minvote, maxvote, maxvote-minvote)
verbosity = 10000
scorehist = {}
numvoteshist = {}
timeofdayhist = {}
dirhist = {0:0, 1:0, -1:0}
print 'Preparing DB cursor for histogram gathering'
# the dump was filtered by vote time, so we probably have votes for old
# links in the DB that this query will ignore
c = sl.execute("""select v.link,
v.direction,
v.timestamp,
l.timestamp
from votes v,
links l
where v.link = l.link_id
and l.timestamp > ?
and l.timestamp < ?
order by v.link, v.timestamp""",
(minvote, maxvote))
c = progress(c, verbosity, estimate=numvotes, key=lambda x: x[0])
for l, vs in itertools.groupby(c, key=lambda x: x[0]):
# since we had the DB do the sorting for us, these are already
# sorted by timestamp
# we'll need to iterate this more than once
# vs = _l(vs)
byhour = {}
link = Link(l, 0, 0, 0)
# this loop also sets the timestamp and score
with emit(hashdir('data/scorebytime',l,'.csv')) as sbt, \
emit(hashdir('data/votesbyhour',l,'.csv')) as vbh:
for linkid, direction, vtimestamp, ltimestamp in vs:
if link.timestamp == 0:
link.timestamp = ltimestamp
if direction == 1:
link.ups += 1
elif direction == -1:
link.downs += 1
sbt.emit(vtimestamp, link.score)
dt = datetime.utcfromtimestamp(vtimestamp)
timeofdayhist[dt.hour] = timeofdayhist.get(dt.hour, 0) + 1
dirhist[direction] += 1
h = vtimestamp/60/60
byhour[h] = byhour.get(h, 0) + 1
for h, c in sorted(byhour.iteritems()):
vbh.emit(h, c)
sp = pow2(link.score)
scorehist[sp] = scorehist.get(sp, 0) + 1
if link.ups + link.downs > 0:
# otherwise all votes on this link were non-votes
nvp = pow2(link.ups+link.downs)
numvoteshist[nvp] = numvoteshist.get(nvp, 0) + 1
with emit('data/scorehist.csv') as e:
for p, count in sorted(scorehist.iteritems()):
e.emit(p, count)
with emit('data/numvoteshist.csv') as e:
for p, count in sorted(numvoteshist.iteritems()):
e.emit(p, count)
with emit('data/timeofdayhist.csv') as e:
for p, count in sorted(timeofdayhist.iteritems()):
e.emit(p, count)
with emit('data/dirhist.csv') as e:
for p, count in sorted(dirhist.iteritems()):
e.emit(p, count)
links = {}
snapshotchunksize = 10*1000
snapshotlength = 200
# this has to be a whole second pass because we need it to be ordered by
# time instead of by link id
print 'Preparing DB cursor for chunked replay'
c = sl.execute("""select v.link,
v.direction,
v.timestamp as integer,
l.timestamp
from votes v,
links l
where v.link = l.link_id
and l.timestamp > ?
and l.timestamp < ?
order by v.timestamp""",
(minvote, maxvote))
c = progress(c, verbosity, estimate=numvotes, key=lambda x: x[0])
with emit('data/snapshots.csv') as e:
for chunk in in_chunks(c, snapshotchunksize):
for link, direction, vtimestamp, ltimestamp in chunk:
try:
l = links[link]
except KeyError:
l = links[link] = Link(link, 0, 0, ltimestamp)
if direction == -1:
l.downs += 1
elif direction == 1:
l.ups += 1
topfew = sorted(links.itervalues(), key=lambda x: x.hot, reverse=True)
topfew = topfew[:snapshotlength]
for n, link in enumerate(topfew):
e.emit(vtimestamp, link.linkid, link.hot, n)
if __name__ == '__main__':
report(sys.argv[1])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment