Created
March 16, 2024 20:58
-
-
Save abihf/6e339d8dcc75d9d3a7474800bea25b7b to your computer and use it in GitHub Desktop.
Golang Implementation for 1 Billion Rows Challange
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bufio" | |
"bytes" | |
"errors" | |
"fmt" | |
"hash/maphash" | |
"io" | |
"os" | |
"runtime/pprof" | |
"slices" | |
"sync" | |
"sync/atomic" | |
) | |
const bufSize = 16 * 1024 * 1024 // 16 MiB | |
const workserSize = 23 | |
const maxLineLen = 128 | |
var bufPool = sync.Pool{New: func() interface{} { | |
return make([]byte, bufSize+maxLineLen) | |
}} | |
var hashSeed = maphash.MakeSeed() | |
var totalLine atomic.Uint64 | |
type tempData struct { | |
sum int64 // 8 | |
count uint32 // 4 | |
min, max int16 // 4 | |
// nameLen uint8 // 4 | |
name [48]byte | |
} | |
func main() { | |
if os.Getenv("PPROF") == "1" { | |
profFile, _ := os.Create("cpu.prof") | |
defer profFile.Close() | |
pprof.StartCPUProfile(profFile) | |
defer pprof.StopCPUProfile() | |
} | |
resCh := make(chan map[uint64]*tempData, workserSize) | |
bulkCh := make(chan []byte, workserSize*2) | |
for i := 0; i < workserSize; i++ { | |
go worker(bulkCh, resCh) | |
} | |
file, _ := os.Open("input.csv") | |
// bufR := bufio.NewReader(file) | |
var restBytes [maxLineLen]byte | |
restLen := 0 | |
for { | |
buf := bufPool.Get().([]byte) | |
n, err := file.Read(buf[restLen : restLen+bufSize]) | |
if err != nil { | |
if !errors.Is(err, io.EOF) { | |
panic(err) | |
} | |
bufPool.Put(buf) | |
break | |
} | |
if n == 0 { | |
bufPool.Put(buf) | |
continue | |
} | |
if restLen > 0 { | |
copy(buf, restBytes[:restLen]) | |
n += restLen | |
} | |
lastLn := bytes.LastIndexByte(buf[:n], '\n') + 1 | |
if lastLn < n { | |
restLen = copy(restBytes[:], buf[lastLn:n]) | |
} else { | |
restLen = 0 | |
} | |
bulkCh <- buf[:lastLn] | |
} | |
file.Close() | |
close(bulkCh) | |
merged := make(map[uint64]*tempData, 10_000) | |
for i := 0; i < workserSize; i++ { | |
dataMap := <-resCh | |
for id, v := range dataMap { | |
if md, ok := merged[id]; ok { | |
md.Merge(v.sum, v.count, v.min, v.max) | |
// data.sum += values.sum | |
// data.count += values.count | |
// if values.min < data.min { | |
// data.min = values.min | |
// } | |
// if values.max > data.max { | |
// data.max = values.max | |
// } | |
} else { | |
merged[id] = v | |
} | |
} | |
} | |
list := make([]*tempData, 0, len(merged)) | |
for _, v := range merged { | |
list = append(list, v) | |
} | |
merged = nil | |
slices.SortFunc(list, func(a, b *tempData) int { | |
return a.Compare(b) | |
}) | |
outFile, _ := os.Create("output.csv") | |
w := bufio.NewWriterSize(outFile, 4*1024*1024) | |
defer outFile.Close() | |
for _, v := range list { | |
v.Print(w) | |
} | |
w.Flush() | |
println("Processed", totalLine.Load(), "lines") | |
} | |
func worker(ch chan []byte, resCh chan map[uint64]*tempData) { | |
res := make(map[uint64]*tempData, 10000) | |
lines := uint64(0) | |
for origBuf := range ch { | |
buf := origBuf | |
for { | |
scIdx := bytes.IndexByte(buf, ';') | |
if scIdx < 0 { | |
break | |
} | |
city := buf[:scIdx] | |
buf = buf[scIdx+1:] | |
lnIdx := bytes.IndexByte(buf, '\n') | |
val := btoi(buf[:lnIdx]) | |
buf = buf[lnIdx+1:] | |
lines++ | |
hash := maphash.Bytes(hashSeed, city) | |
data, ok := res[hash] | |
if !ok { | |
res[hash] = &tempData{sum: int64(val), min: val, max: val, count: 1} | |
copy(res[hash].name[:], city) | |
} else { | |
// if bytes.Compare(data.name[:len(city)], city) != 0 { | |
// panic("city name collision") | |
// } | |
// data.Merge(int64(val), val, val, 1) | |
data.sum += int64(val) | |
data.count++ | |
if val < data.min { | |
data.min = val | |
} | |
if val > data.max { | |
data.max = val | |
} | |
} | |
} | |
bufPool.Put(origBuf) | |
} | |
totalLine.Add(lines) | |
resCh <- res | |
} | |
func btoi(b []byte) int16 { | |
isMinus := b[0] == '-' | |
if isMinus { | |
b = b[1:] | |
} | |
// l := len(b) | |
// n := int16(b[l-1] - '0') | |
// c := int16(b[l-3] - '0') | |
// n += c << 3 + c << 1 | |
// if l > 3 { | |
// c = int16(b[l-4] - '0') | |
// n += c << 6 + c << 5 + c << 2 | |
// } | |
n := int16(b[0] - '0') | |
for _, c := range b[1:] { | |
if c == '.' { | |
continue | |
} | |
n = n*10 + int16(c-'0') | |
} | |
if isMinus { | |
n = -n | |
} | |
return n | |
} | |
func (t *tempData) Print(w io.Writer) { | |
name := t.name[:bytes.IndexByte(t.name[:], 0)] | |
fmt.Fprintf(w, "%s;%.1f;%.1f;%.1f\n", name, float64(t.sum)/float64(t.count)/10, float32(t.min)/10, float32(t.max)/10) | |
} | |
//go:inline | |
func (t *tempData) Merge(sum int64, count uint32, min, max int16) { | |
t.sum += sum | |
t.count += count | |
if min < t.min { | |
t.min = min | |
} | |
if max > t.max { | |
t.max = max | |
} | |
} | |
func (t *tempData) Compare(b *tempData) int { | |
return bytes.Compare(t.name[:], b.name[:]) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment