Skip to content

Instantly share code, notes, and snippets.

@abihf
Created March 16, 2024 20:58
Show Gist options
  • Save abihf/6e339d8dcc75d9d3a7474800bea25b7b to your computer and use it in GitHub Desktop.
Save abihf/6e339d8dcc75d9d3a7474800bea25b7b to your computer and use it in GitHub Desktop.
Golang Implementation for 1 Billion Rows Challange
package main
import (
"bufio"
"bytes"
"errors"
"fmt"
"hash/maphash"
"io"
"os"
"runtime/pprof"
"slices"
"sync"
"sync/atomic"
)
const bufSize = 16 * 1024 * 1024 // 16 MiB
const workserSize = 23
const maxLineLen = 128
var bufPool = sync.Pool{New: func() interface{} {
return make([]byte, bufSize+maxLineLen)
}}
var hashSeed = maphash.MakeSeed()
var totalLine atomic.Uint64
type tempData struct {
sum int64 // 8
count uint32 // 4
min, max int16 // 4
// nameLen uint8 // 4
name [48]byte
}
func main() {
if os.Getenv("PPROF") == "1" {
profFile, _ := os.Create("cpu.prof")
defer profFile.Close()
pprof.StartCPUProfile(profFile)
defer pprof.StopCPUProfile()
}
resCh := make(chan map[uint64]*tempData, workserSize)
bulkCh := make(chan []byte, workserSize*2)
for i := 0; i < workserSize; i++ {
go worker(bulkCh, resCh)
}
file, _ := os.Open("input.csv")
// bufR := bufio.NewReader(file)
var restBytes [maxLineLen]byte
restLen := 0
for {
buf := bufPool.Get().([]byte)
n, err := file.Read(buf[restLen : restLen+bufSize])
if err != nil {
if !errors.Is(err, io.EOF) {
panic(err)
}
bufPool.Put(buf)
break
}
if n == 0 {
bufPool.Put(buf)
continue
}
if restLen > 0 {
copy(buf, restBytes[:restLen])
n += restLen
}
lastLn := bytes.LastIndexByte(buf[:n], '\n') + 1
if lastLn < n {
restLen = copy(restBytes[:], buf[lastLn:n])
} else {
restLen = 0
}
bulkCh <- buf[:lastLn]
}
file.Close()
close(bulkCh)
merged := make(map[uint64]*tempData, 10_000)
for i := 0; i < workserSize; i++ {
dataMap := <-resCh
for id, v := range dataMap {
if md, ok := merged[id]; ok {
md.Merge(v.sum, v.count, v.min, v.max)
// data.sum += values.sum
// data.count += values.count
// if values.min < data.min {
// data.min = values.min
// }
// if values.max > data.max {
// data.max = values.max
// }
} else {
merged[id] = v
}
}
}
list := make([]*tempData, 0, len(merged))
for _, v := range merged {
list = append(list, v)
}
merged = nil
slices.SortFunc(list, func(a, b *tempData) int {
return a.Compare(b)
})
outFile, _ := os.Create("output.csv")
w := bufio.NewWriterSize(outFile, 4*1024*1024)
defer outFile.Close()
for _, v := range list {
v.Print(w)
}
w.Flush()
println("Processed", totalLine.Load(), "lines")
}
func worker(ch chan []byte, resCh chan map[uint64]*tempData) {
res := make(map[uint64]*tempData, 10000)
lines := uint64(0)
for origBuf := range ch {
buf := origBuf
for {
scIdx := bytes.IndexByte(buf, ';')
if scIdx < 0 {
break
}
city := buf[:scIdx]
buf = buf[scIdx+1:]
lnIdx := bytes.IndexByte(buf, '\n')
val := btoi(buf[:lnIdx])
buf = buf[lnIdx+1:]
lines++
hash := maphash.Bytes(hashSeed, city)
data, ok := res[hash]
if !ok {
res[hash] = &tempData{sum: int64(val), min: val, max: val, count: 1}
copy(res[hash].name[:], city)
} else {
// if bytes.Compare(data.name[:len(city)], city) != 0 {
// panic("city name collision")
// }
// data.Merge(int64(val), val, val, 1)
data.sum += int64(val)
data.count++
if val < data.min {
data.min = val
}
if val > data.max {
data.max = val
}
}
}
bufPool.Put(origBuf)
}
totalLine.Add(lines)
resCh <- res
}
func btoi(b []byte) int16 {
isMinus := b[0] == '-'
if isMinus {
b = b[1:]
}
// l := len(b)
// n := int16(b[l-1] - '0')
// c := int16(b[l-3] - '0')
// n += c << 3 + c << 1
// if l > 3 {
// c = int16(b[l-4] - '0')
// n += c << 6 + c << 5 + c << 2
// }
n := int16(b[0] - '0')
for _, c := range b[1:] {
if c == '.' {
continue
}
n = n*10 + int16(c-'0')
}
if isMinus {
n = -n
}
return n
}
func (t *tempData) Print(w io.Writer) {
name := t.name[:bytes.IndexByte(t.name[:], 0)]
fmt.Fprintf(w, "%s;%.1f;%.1f;%.1f\n", name, float64(t.sum)/float64(t.count)/10, float32(t.min)/10, float32(t.max)/10)
}
//go:inline
func (t *tempData) Merge(sum int64, count uint32, min, max int16) {
t.sum += sum
t.count += count
if min < t.min {
t.min = min
}
if max > t.max {
t.max = max
}
}
func (t *tempData) Compare(b *tempData) int {
return bytes.Compare(t.name[:], b.name[:])
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment