Skip to content

Instantly share code, notes, and snippets.

@renthraysk
Last active March 23, 2024 15:48
Show Gist options
  • Save renthraysk/80d5ed7135671e1af5e6af3e2bd3ddce to your computer and use it in GitHub Desktop.
Save renthraysk/80d5ed7135671e1af5e6af3e2bd3ddce to your computer and use it in GitHub Desktop.
Initial pass at the 1BRC
package main
import (
"bytes"
"errors"
"flag"
"io"
"log"
"os"
"sort"
"sync"
)
type row struct {
name string
sum int64
count uint32
min int16
max int16
}
func (r *row) appendLen() int { return len(r.name) + len("=-00.0/-00.0/-00.0, ") }
func (r *row) append(b []byte) []byte {
b = append(b, r.name...)
b = append(b, '=')
b = appendFixedFloat(b, r.min)
b = append(b, '/')
b = appendFixedFloat(b, r.sum/int64(r.count))
b = append(b, '/')
b = appendFixedFloat(b, r.max)
return append(b, ',', ' ')
}
func appendFixedFloat[T int16 | int64](p []byte, i T) []byte {
if i < 0 {
p = append(p, '-')
i = -i
}
if i <= 9 {
return append(p, '0', '.', byte(i)+'0')
}
j := i / 10
if j <= 9 {
return append(p, byte(j)+'0', '.', byte(i-j*10)+'0')
}
k := j / 10
return append(p, byte(k)+'0', byte(j-k*10)+'0', '.', byte(i-j*10+'0'))
}
func main() {
var name string
fs := flag.NewFlagSet(os.Args[0], flag.ExitOnError)
fs.StringVar(&name, "input", "/data/measurements.txt", "file to load")
if err := fs.Parse(os.Args[1:]); err != nil {
panic("unreachable")
}
s, err := processFile(name)
if err != nil {
log.Fatalf("failed to process %q: %v", name, err)
}
s.writeResults(os.Stdout)
}
func processFile(name string) (stats, error) {
f, err := os.Open(name)
if err != nil {
return nil, err
}
defer f.Close()
return processReader(f, 4*1024*1024, 4)
}
func processReader(r io.Reader, chunkSize, workerCount int) (stats, error) {
var wg sync.WaitGroup
workCh := make(chan []byte, workerCount)
freeCh := make(chan []byte, 1+workerCount)
statsCh := make(chan stats, workerCount)
for range workerCount {
wg.Add(1)
go func() {
defer wg.Done()
freeCh <- make([]byte, chunkSize)
s := make(stats, 1<<10)
for lines := range workCh {
s.processLines(lines)
freeCh <- lines
}
statsCh <- s
}()
}
buf := make([]byte, chunkSize)
n, err := r.Read(buf)
for n > 0 {
lines, broken := cutAfterLast(buf[:n], '\n')
buf = <-freeCh
buf = append(buf[:0], broken...)
workCh <- lines
n, err = r.Read(buf[len(buf):cap(buf)])
n += len(buf)
}
close(workCh)
wg.Wait()
close(freeCh)
close(statsCh)
if err != nil {
if !errors.Is(err, io.EOF) {
return nil, err
}
err = nil
}
s := <-statsCh
for m := range statsCh {
for name, row := range m {
if r, ok := s[name]; ok {
r.sum += row.sum
r.count += row.count
r.min = min(r.min, row.min)
r.max = max(r.max, row.max)
continue
}
s[name] = row
}
}
return s, err
}
type stats map[string]*row
func (s stats) processLines(lines []byte) {
var city, value []byte
var rows []row
for len(lines) > 0 {
city, lines = cut(lines, ';')
value, lines = cut(lines, '\n')
t := parseTemp(value)
if r, ok := s[string(city)]; ok {
r.sum += int64(t)
r.count++
r.min = min(r.min, t)
r.max = max(r.max, t)
continue
}
if len(rows) == 0 {
rows = make([]row, 64)
}
r := &rows[0]
rows = rows[1:]
r.name = string(city)
r.sum = int64(t)
r.count = 1
r.min = t
r.max = t
s[r.name] = r
}
}
func (s stats) writeResults(w io.Writer) error {
ordered := make([]*row, len(s))
i := 0
for _, r := range s {
ordered[i] = r
i++
}
sort.Slice(ordered, func(i, j int) bool { return ordered[i].name < ordered[j].name })
b := make([]byte, 0, 8<<10)
b = append(b, '{')
for _, r := range ordered {
if r.appendLen() > cap(b)-len(b) {
// Flush
n, err := w.Write(b)
if err != nil {
return err
}
b = append(b[:0], b[n:]...)
}
b = r.append(b)
}
b = append(b, '}', '\n')
_, err := w.Write(b)
return err
}
func parseTemp(b []byte) int16 {
if len(b) == 0 {
return 0
}
if b[0] == '-' {
return -parseNonNegative(b[1:])
}
return parseNonNegative(b)
}
func parseNonNegative(b []byte) int16 {
var i, x int
for ; i < len(b) && b[i]-'0' <= 9; i++ {
x += int(b[i] - '0')
x *= 10
}
if i < len(b) {
b = b[i:]
if len(b) >= 2 && b[0] == '.' {
if d := b[1] - '0'; d <= 9 {
x += int(d)
}
}
}
return int16(x)
}
func cut(s []byte, c byte) (prefix []byte, suffix []byte) {
if i := bytes.IndexByte(s, c); i >= 0 {
return s[:i], s[i+1:]
}
return s, nil
}
// cutAfterLast returns s split into two, after the last location of byte c
func cutAfterLast(b []byte, c byte) ([]byte, []byte) {
if i := bytes.LastIndexByte(b, c); i >= 0 {
i++
return b[:i], b[i:]
}
return nil, b
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment