Created March 16, 2024 20:58
Golang Implementation for 1 Billion Rows Challange
package main
import (
const bufSize = 16 * 1024 * 1024 // 16 MiB
const workserSize = 23
const maxLineLen = 128
var bufPool = sync.Pool{New: func() interface{} {
return make([]byte, bufSize+maxLineLen)
var hashSeed = maphash.MakeSeed()
var totalLine atomic.Uint64
type tempData struct {
sum int64 // 8
count uint32 // 4
min, max int16 // 4
// nameLen uint8 // 4
name [48]byte
func main() {
if os.Getenv("PPROF") == "1" {
profFile, _ := os.Create("")
defer profFile.Close()
defer pprof.StopCPUProfile()
resCh := make(chan map[uint64]*tempData, workserSize)
bulkCh := make(chan []byte, workserSize*2)
for i := 0; i < workserSize; i++ {
go worker(bulkCh, resCh)
file, _ := os.Open("input.csv")
// bufR := bufio.NewReader(file)
var restBytes [maxLineLen]byte
restLen := 0
for {
buf := bufPool.Get().([]byte)
n, err := file.Read(buf[restLen : restLen+bufSize])
if err != nil {
if !errors.Is(err, io.EOF) {
if n == 0 {
if restLen > 0 {
copy(buf, restBytes[:restLen])
n += restLen
lastLn := bytes.LastIndexByte(buf[:n], '\n') + 1
if lastLn < n {
restLen = copy(restBytes[:], buf[lastLn:n])
} else {
restLen = 0
bulkCh <- buf[:lastLn]
merged := make(map[uint64]*tempData, 10_000)
for i := 0; i < workserSize; i++ {
dataMap := <-resCh
for id, v := range dataMap {
if md, ok := merged[id]; ok {
md.Merge(v.sum, v.count, v.min, v.max)
// data.sum += values.sum
// data.count += values.count
// if values.min < data.min {
// data.min = values.min
// }
// if values.max > data.max {
// data.max = values.max
// }
} else {
merged[id] = v
list := make([]*tempData, 0, len(merged))
for _, v := range merged {
list = append(list, v)
merged = nil
slices.SortFunc(list, func(a, b *tempData) int {
return a.Compare(b)
outFile, _ := os.Create("output.csv")
w := bufio.NewWriterSize(outFile, 4*1024*1024)
defer outFile.Close()
for _, v := range list {
println("Processed", totalLine.Load(), "lines")
func worker(ch chan []byte, resCh chan map[uint64]*tempData) {
res := make(map[uint64]*tempData, 10000)
lines := uint64(0)
for origBuf := range ch {
buf := origBuf
for {
scIdx := bytes.IndexByte(buf, ';')
if scIdx < 0 {
city := buf[:scIdx]
buf = buf[scIdx+1:]
lnIdx := bytes.IndexByte(buf, '\n')
val := btoi(buf[:lnIdx])
buf = buf[lnIdx+1:]
hash := maphash.Bytes(hashSeed, city)
data, ok := res[hash]
if !ok {
res[hash] = &tempData{sum: int64(val), min: val, max: val, count: 1}
copy(res[hash].name[:], city)
} else {
// if bytes.Compare([:len(city)], city) != 0 {
// panic("city name collision")
// }
// data.Merge(int64(val), val, val, 1)
data.sum += int64(val)
if val < data.min {
data.min = val
if val > data.max {
data.max = val
resCh <- res
func btoi(b []byte) int16 {
isMinus := b[0] == '-'
if isMinus {
b = b[1:]
// l := len(b)
// n := int16(b[l-1] - '0')
// c := int16(b[l-3] - '0')
// n += c << 3 + c << 1
// if l > 3 {
// c = int16(b[l-4] - '0')
// n += c << 6 + c << 5 + c << 2
// }
n := int16(b[0] - '0')
for _, c := range b[1:] {
if c == '.' {
n = n*10 + int16(c-'0')
if isMinus {
n = -n
return n
func (t *tempData) Print(w io.Writer) {
name :=[:bytes.IndexByte([:], 0)]
fmt.Fprintf(w, "%s;%.1f;%.1f;%.1f\n", name, float64(t.sum)/float64(t.count)/10, float32(t.min)/10, float32(t.max)/10)
func (t *tempData) Merge(sum int64, count uint32, min, max int16) {
t.sum += sum
t.count += count
if min < t.min {
t.min = min
if max > t.max {
t.max = max
func (t *tempData) Compare(b *tempData) int {
return bytes.Compare([:],[:])
