Skip to content

Instantly share code, notes, and snippets.

@ants
Last active January 18, 2024 15:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ants/0a684161ff07e386f1b1f71f011630d6 to your computer and use it in GitHub Desktop.
Save ants/0a684161ff07e386f1b1f71f011630d6 to your computer and use it in GitHub Desktop.
Simple Python implementation of 1BRC
from collections import defaultdict
from functools import reduce
try:
from itertools import pairwise
except ImportError:
# Simple alternative for pypy
def pairwise(seq):
seq = list(seq)
return zip(seq, seq[1:])
import multiprocessing
import mmap
import os
import sys
class StatEntry:
def __init__(self):
self.min = 1000000.0
self.max = -1000000.0
self.sum = 0.0
self.count = 0
def update(self, measurement):
if self.min > measurement:
self.min = measurement
if self.max < measurement:
self.max = measurement
self.sum += measurement
self.count += 1
def __add__(self, other):
s = StatEntry()
s.min = min(self.min, other.min)
s.max = max(self.max, other.max)
s.sum = self.sum + other.sum
s.count = self.count + other.count
return s
def merge(a, b):
return {key: a[key] + b[key] for key in set(a.keys()).union((b.keys()))}
def collect_stats(filename, start, end):
with open(sys.argv[1], 'r+b') as f:
mm = mmap.mmap(f.fileno(), 0)
mm.seek(start)
stats = defaultdict(StatEntry)
while mm.tell() < end:
city, measurement = mm.readline().split(b';', 1)
stats[city].update(float(measurement))
return stats
def align(f, pos):
f.seek(pos)
f.readline()
return f.tell()
def get_segments(filename, n):
with open(sys.argv[1], 'r+b') as f:
l = os.path.getsize(filename)
return pairwise([0] + [align(f, int(i*l/n)) for i in range(1,n)] + [l])
if __name__ == '__main__':
filename = sys.argv[1]
num_procs = multiprocessing.cpu_count()
pool = multiprocessing.Pool(num_procs)
stats = reduce(merge, pool.starmap(collect_stats, [(filename, start, end)
for start, end in get_segments(filename, num_procs)]))
print(", ".join(f"{c.decode('utf8')}: {s.min:0.1f}/{s.sum / s.count:0.1f}/{s.max:0.1f}"
for c,s in sorted(stats.items())))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment