Skip to content

Instantly share code, notes, and snippets.

@tangledbytes
Created January 23, 2024 07:15
Show Gist options
  • Save tangledbytes/06698ef369b46523f7890e51c4757651 to your computer and use it in GitHub Desktop.
Save tangledbytes/06698ef369b46523f7890e51c4757651 to your computer and use it in GitHub Desktop.
1brc-go-sol
package main
import (
"bytes"
"fmt"
"math"
"os"
"os/signal"
"runtime"
"runtime/pprof"
"sort"
"sync"
"syscall"
"unsafe"
)
const TABLE_SIZE = 98317
//go:linkname mmap syscall.mmap
func mmap(addr uintptr, length uintptr, prot int, flags int, fd int, offset int64) (raddr uintptr, err error)
//go:linkname munmap syscall.munmap
func munmap(addr uintptr, length uintptr) (err error)
type FullData struct {
name string
data *Data
}
type Data struct {
Min float64
Max float64
Sum float64
Count int64
}
type _Map struct {
slots [TABLE_SIZE]*_MapData
}
type _MapData struct {
key []byte
value *Data
hash uint32
}
func NewMap() *_Map {
return &_Map{
slots: [TABLE_SIZE]*_MapData{},
}
}
func (mp *_Map) Add(k []byte, v *Data) {
hash := mp.shash(k)
slotIdx := hash % TABLE_SIZE
prober := slotIdx
for {
slot := mp.slots[prober]
if slot == nil {
mp.slots[slotIdx] = &_MapData{
key: k,
value: v,
hash: hash,
}
return
}
if slot.hash == hash && bytes.Equal(slot.key, k) {
slot.value = v
return
}
// mp.amiss++
prober = (prober + 1) % TABLE_SIZE
if prober == slotIdx {
break
}
}
panic("WTF")
}
func (mp *_Map) Get(k []byte) *Data {
hash := mp.shash(k)
slotIdx := hash % TABLE_SIZE
prober := slotIdx
for {
slot := mp.slots[prober]
if slot == nil {
return nil
}
if slot.hash == hash && bytes.Equal(slot.key, k) {
return slot.value
}
prober = (prober + 1) % TABLE_SIZE
if prober == slotIdx {
break
}
}
return nil
}
func (mp *_Map) Range(fn func(k []byte, v *Data)) {
for _, slot := range mp.slots {
if slot == nil {
continue
}
fn(slot.key, slot.value)
}
}
func (mp *_Map) shash(k []byte) uint32 {
v := uint32(2166136261)
for i := 0; i < len(k); i++ {
v ^= uint32(k[i])
v *= 16777619
}
return v
}
func clusteredProcess(file string, mp *_Map) {
wg := &sync.WaitGroup{}
cpus := runtime.NumCPU()
stores := make([]*_Map, cpus)
for i := range stores {
stores[i] = NewMap()
}
// data := loadFile(file)
stat, err := os.Stat(file)
if err != nil {
panic(err)
}
dataSize := int(stat.Size())
sizePerCPU := dataSize / cpus
remains := dataSize % cpus
for i := 0; i < cpus; i++ {
wg.Add(1)
go func(i int) {
size := sizePerCPU
if i == cpus-1 {
size += remains
}
data := loadFile(file)
consumeChunk(data, i*sizePerCPU, size, stores[i])
// println("add_miss:", i, stores[i].amiss)
wg.Done()
}(i)
}
wg.Wait()
// Merge the results into final map
for i := 0; i < cpus; i++ {
stores[i].Range(func(k []byte, v *Data) {
data := mp.Get(k)
if data == nil {
mp.Add(k, &Data{
Min: v.Min,
Max: v.Max,
Sum: v.Sum,
Count: v.Count,
})
return
}
if v.Min < data.Min {
data.Min = v.Min
}
if v.Max > data.Max {
data.Max = v.Max
}
data.Sum += v.Sum
data.Count += v.Count
})
}
}
func consumeChunk(data []byte, chunkOffset, size int, mp *_Map) {
// 1. Find the start point
var start int
if chunkOffset == 0 {
start = 0
} else {
for {
if data[chunkOffset-1] == '\n' {
start = chunkOffset
break
}
chunkOffset++
}
}
// 2. Parse the data
readptr := start
for readptr-start < size {
start, end := readNewLine(data, readptr)
if start == -1 {
break
}
readptr = end + 1
// Process the line
_process(data[start:end-1], mp)
}
}
func loadFile(name string) []byte {
file, err := os.Open(name)
if err != nil {
panic(err)
}
stat, err := file.Stat()
if err != nil {
panic(err)
}
addr, err := mmap(0, uintptr(stat.Size()), syscall.PROT_READ, syscall.MAP_PRIVATE, int(file.Fd()), 0)
if err != nil {
panic(err)
}
return unsafe.Slice((*byte)(unsafe.Pointer(addr)), stat.Size())
}
func readNewLine(byt []byte, offset int) (int, int) {
for i := offset; i < len(byt); i++ {
if byt[i] == '\n' {
return offset, i
}
}
return -1, -1
}
func _process(byt []byte, mp *_Map) {
delimLoc := 0
for i := range byt {
if byt[i] == ';' {
delimLoc = i
}
}
place := byt[:delimLoc]
val := byteToFloat(byt[delimLoc+1:])
data := mp.Get(place)
if data == nil {
mp.Add(place, &Data{
Min: val,
Max: val,
Sum: val,
Count: 1,
})
} else {
if val < data.Min {
data.Min = val
}
if val > data.Max {
data.Max = val
}
data.Sum += val
data.Count += 1
}
}
func byteToFloat(byt []byte) float64 {
isNeg := byt[0] == '-'
decPos := -1
num := 0.0
for i, val := range byt {
if i == 0 && isNeg {
continue
}
if val == '.' {
decPos = i
continue
}
digit := val - '0'
num = (num * 10) + float64(digit)
}
if decPos != -1 {
num /= math.Pow10((len(byt) - 1) - decPos)
}
if isNeg {
return num * -1
}
return num
}
func getSortedStore(store *_Map) []*FullData {
data := make([]*FullData, 0, TABLE_SIZE)
store.Range(func(key []byte, value *Data) {
data = append(data, &FullData{
name: string(key),
data: value,
})
})
sort.Slice(data, func(i, j int) bool {
return data[i].name < data[j].name
})
return data
}
func printSortedStore(sortedstore []*FullData) {
fmt.Print("{ ")
for i, v := range sortedstore {
fmt.Printf("%s=%.1f/%.1f/%.1f", v.name, v.data.Min, v.data.Sum/float64(v.data.Count), v.data.Max)
if i != len(sortedstore)-1 {
fmt.Printf(", ")
}
}
fmt.Print(" }")
}
func setupCPUProfile() {
if os.Getenv("CPU_PROFILE") == "1" {
f, err := os.Create("cpu.pprof")
if err != nil {
panic(err)
}
if err := pprof.StartCPUProfile(f); err != nil {
panic(err)
}
}
}
func stopCPUProfile() {
if os.Getenv("CPU_PROFILE") == "1" {
pprof.StopCPUProfile()
}
}
func exitHooks(fns ...func()) {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
<-c
for _, fn := range fns {
fn()
}
os.Exit(1)
}()
}
func main() {
setupCPUProfile()
exitHooks(
stopCPUProfile,
)
mp := NewMap()
clusteredProcess("./measurements.txt", mp)
printSortedStore(getSortedStore(mp))
stopCPUProfile()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment