Skip to content

Instantly share code, notes, and snippets.

@corlinp
Last active March 5, 2024 07:24
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save corlinp/176a97c58099bca36bcd5679e68f9708 to your computer and use it in GitHub Desktop.
Save corlinp/176a97c58099bca36bcd5679e68f9708 to your computer and use it in GitHub Desktop.
1BRC in Go - corlinp
/*
Corlin Palmer's Go solution to the 1BRC coding challenge: https://github.com/gunnarmorling/1brc
- This solution reads the file sequentially as fast as possible (reader)
- It passes off the job of ensuring that each chunk ends with a complete line to another goroutine (lineSupervisor)
- The lineSupervisor sends valid chunks to a pool of worker goroutines (worker) which parse the data and calculate the results
- The results from the workers are collected in a map and then sorted before printing the final results
A fair amount of optimization has been done to reduce memory allocations.
Still, it's currently 2X faster than the best Java implementation (on my machine, a Macbook M3 Pro).
Script Time (s)
calculate_average_corlinp.sh 5.092
calculate_average_warpspeedlabs.sh 8.183 (Go)
calculate_average_ddimtirov.sh 10.183
calculate_average_ebarlas.sh 11.625
calculate_average_royvanrijn.sh 11.970
calculate_average_AlexanderYastrebov.sh 14.114 (Go)
calculate_average_filiphr.sh 15.038
calculate_average_palmr.sh 15.359
calculate_average_spullara.sh 16.350
calculate_average_seijikun.sh 20.214
calculate_average_padreati.sh 22.460
calculate_average_richardstartin.sh 22.496
calculate_average_bjhara.sh 23.166
calculate_average_criccomini.sh 23.378
calculate_average_truelive.sh 25.741
calculate_average_khmarbaise.sh 41.079
calculate_average_kuduwa-keshavram.sh 45.390
calculate_average_itaske.sh 50.250
calculate_average.sh 162.420
For some fun comparisons, it's even faster than wc -l, but still slower than piping it to /dev/null:
cat measurements.txt > /dev/null 3.522
wc -l measurements.txt 11.892
*/
package main
import (
"bytes"
"fmt"
"io"
"os"
"sort"
"sync"
)
// reader reads raw chunks from the file and sends them to the rawChunks channel.
func reader(file *os.File, rawChunks chan<- []byte) {
const chunkSize = 256 * 1024
buf := make([]byte, chunkSize)
for {
bytesRead, err := file.Read(buf)
if bytesRead > 0 {
chunk := make([]byte, bytesRead)
copy(chunk, buf[:bytesRead])
rawChunks <- chunk
}
if err != nil {
if err != io.EOF {
fmt.Printf("Error reading file: %v\n", err)
}
break
}
}
close(rawChunks)
}
// processChunk processes a single chunk and returns the valid part and the leftover part.
func processChunk(chunk, leftover []byte) (validPart, newLeftover []byte) {
// Find the first and last newline to determine the valid part of the chunk
firstNewline := bytes.Index(chunk, []byte{'\n'})
lastNewline := bytes.LastIndex(chunk, []byte{'\n'})
if firstNewline != -1 {
// There's a complete line at the beginning of the chunk
validPart = append(leftover, chunk[:firstNewline+1]...)
leftover = leftover[:0] // Clear the leftover
} else {
// No complete line at the start, append the whole chunk to the leftover
leftover = append(leftover, chunk...)
}
if lastNewline != -1 && firstNewline != lastNewline {
// There's at least one complete line in this chunk
// Include the middle part of the chunk, which contains only complete lines
validPart = append(validPart, chunk[firstNewline+1:lastNewline+1]...)
// Store the dangling end (if any) in newLeftover for the next chunk
newLeftover = append(newLeftover, chunk[lastNewline+1:]...)
} else {
// No complete lines or only one complete line in this chunk
newLeftover = leftover
}
return validPart, newLeftover
}
// lineSupervisor takes raw chunks, ensures they end with complete lines, and sends valid chunks to the workers.
func lineSupervisor(rawChunks <-chan []byte, validChunks chan<- []byte) {
buffer := make([]byte, 0) // Buffer to hold the dangling ends
for chunk := range rawChunks {
validPart, newBuffer := processChunk(chunk, buffer)
if len(validPart) > 0 {
validChunks <- validPart
}
buffer = newBuffer
}
// Handle any data left in the buffer after all chunks have been processed
if len(buffer) > 0 {
validChunks <- buffer
}
close(validChunks)
}
type CityName [64]byte
// [0] = sum of all temperatures
// [1] = number of temperatures
// [2] = min temperature
// [3] = max temperature
type CityTemp *[4]int64
// worker parses the lines in a chunk and performs some calculations
func worker(validChunks <-chan []byte, results chan<- map[CityName]CityTemp) {
cityTemps := make(map[CityName]CityTemp, 512)
var city CityName
cityLen := 0
var temp int64
var isNegative bool
for chunk := range validChunks {
for _, b := range chunk {
switch b {
case ';': // Delimiter between city and temperature
cityLen = 64
isNegative = false
case '\n':
if isNegative {
temp = -temp
}
ct := cityTemps[city]
if ct == nil {
minMaxTemp := temp
if isNegative {
minMaxTemp = -minMaxTemp
}
cityTemps[city] = &[4]int64{temp, 1, minMaxTemp, minMaxTemp}
} else {
ct[0] += temp
ct[1]++
if temp < ct[2] {
ct[2] = temp
}
if temp > ct[3] {
ct[3] = temp
}
}
// Reset for the next line
city = CityName{}
cityLen = 0
temp = 0
isNegative = false
case '.':
continue // Skip the decimal point, we know all numbers have 1 decimal place
case '-':
isNegative = true // Next number will be negative
default:
if cityLen < 44 {
city[cityLen] = b
cityLen++
} else {
// Inline parsing of temperature
temp = temp*10 + int64(b-'0')
}
}
}
}
results <- cityTemps
}
func main() {
if len(os.Args) < 2 {
fmt.Println("Error: First arg should be the measurements.txt file path")
return
}
filePath := os.Args[1]
file, err := os.Open(filePath)
if err != nil {
fmt.Printf("Error opening file: %v\n", err)
return
}
defer file.Close()
numWorkers := 256
rawChunks := make(chan []byte, 16384) // buffered channel for raw chunks from the file
validChunks := make(chan []byte, 16384) // buffered channel for valid chunks ending with a complete line
results := make(chan map[CityName]CityTemp, numWorkers) // buffered channel for results from the workers
// Start the reader
go reader(file, rawChunks)
// Start the line supervisor
go lineSupervisor(rawChunks, validChunks)
var wg sync.WaitGroup
// Start worker goroutines
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
worker(validChunks, results)
}()
}
// Collect results
go func() {
wg.Wait()
close(results)
}()
finalResults := make(map[CityName]CityTemp)
for cityTemps := range results {
for city, ct := range cityTemps {
finalCt := finalResults[city]
if finalCt == nil {
finalCt = new([4]int64)
}
finalCt[0] += ct[0]
finalCt[1] += ct[1]
if ct[2] < finalCt[2] {
finalCt[2] = ct[2]
}
if ct[3] > finalCt[3] {
finalCt[3] = ct[3]
}
finalResults[city] = finalCt
}
}
// Sort results
allCities := make([]CityName, 0, len(finalResults))
for city := range finalResults {
allCities = append(allCities, city)
}
sort.Slice(allCities, func(i, j int) bool {
return bytes.Compare(allCities[i][:], allCities[j][:]) < 0
})
// Calculate and print the final results
fmt.Print("{")
for i, city := range allCities {
ct := finalResults[city]
fmt.Printf("%s=%.1f/%.1f/%.1f", city[:], float64(ct[2])/10, float64(ct[0])/float64(ct[1])/10, float64(ct[3])/10)
if i < len(allCities)-1 {
fmt.Print(", ")
}
}
fmt.Println("}")
}
@corlinp
Copy link
Author

corlinp commented Jan 5, 2024

The following uses a hash function that is highly dangerous and overfitted to the list of cities. I iterated a bunch to find some magic numbers that make a small lookup table.

hashResult := 5381
for _, num := range city[:9] {
	hashResult = ((hashResult << 3) + hashResult) + int(num)
}
return hashResult % 11072

It runs in a whopping 3.746s

package main

import (
	"bytes"
	"fmt"
	"io"
	"os"
	"sync"
)

// reader reads raw chunks from the file and sends them to the rawChunks channel.
func reader(file *os.File, rawChunks chan<- []byte) {
	const chunkSize = 256 * 1024
	buf := make([]byte, chunkSize)

	for {
		bytesRead, err := file.Read(buf)
		if bytesRead > 0 {
			chunk := make([]byte, bytesRead)
			copy(chunk, buf[:bytesRead])
			rawChunks <- chunk
		}
		if err != nil {
			if err != io.EOF {
				fmt.Printf("Error reading file: %v\n", err)
			}
			break
		}
	}
	close(rawChunks)
}

// processChunk processes a single chunk and returns the valid part and the leftover part.
func processChunk(chunk, leftover []byte) (validPart, newLeftover []byte) {
	// Find the first and last newline to determine the valid part of the chunk
	firstNewline := bytes.Index(chunk, []byte{'\n'})
	lastNewline := bytes.LastIndex(chunk, []byte{'\n'})

	if firstNewline != -1 {
		// There's a complete line at the beginning of the chunk
		validPart = append(leftover, chunk[:firstNewline+1]...)
		leftover = leftover[:0] // Clear the leftover
	} else {
		// No complete line at the start, append the whole chunk to the leftover
		leftover = append(leftover, chunk...)
	}

	if lastNewline != -1 && firstNewline != lastNewline {
		// There's at least one complete line in this chunk
		// Include the middle part of the chunk, which contains only complete lines
		validPart = append(validPart, chunk[firstNewline+1:lastNewline+1]...)

		// Store the dangling end (if any) in newLeftover for the next chunk
		newLeftover = append(newLeftover, chunk[lastNewline+1:]...)
	} else {
		// No complete lines or only one complete line in this chunk
		newLeftover = leftover
	}

	return validPart, newLeftover
}

// lineSupervisor takes raw chunks, ensures they end with complete lines, and sends valid chunks to the workers.
func lineSupervisor(rawChunks <-chan []byte, validChunks chan<- []byte) {
	buffer := make([]byte, 0) // Buffer to hold the dangling ends

	for chunk := range rawChunks {
		validPart, newBuffer := processChunk(chunk, buffer)
		if len(validPart) > 0 {
			validChunks <- validPart
		}
		buffer = newBuffer
	}

	// Handle any data left in the buffer after all chunks have been processed
	if len(buffer) > 0 {
		validChunks <- buffer
	}

	close(validChunks)
}

// Extremely overfitted for this specific data set - the first 9 bytes is all we need to identify a city
type CityName [9]byte

// [0] = sum of all temperatures
// [1] = number of temperatures
// [2] = min temperature
// [3] = max temperature
type CityTemp [4]int64

type CityLUT *[11072]CityTemp

func worker(validChunks <-chan []byte, results chan<- CityLUT) {
	cityTemps := new([11072]CityTemp)
	var city CityName
	cityLen := 0
	var temp int64
	var isNegative bool

	for chunk := range validChunks {
		for _, b := range chunk {
			switch b {
			case ';': // Delimiter between city and temperature
				cityLen = 64
				isNegative = false
			case '\n':
				if isNegative {
					temp = -temp
				}
				// here's an overfitted djb2-type hash that works well for this data set and fits the cities into 11072 buckets
				hashResult := 5381
				for _, num := range city {
					hashResult = ((hashResult << 3) + hashResult) + int(num)
				}
				hashResult = hashResult % 11072
				ct := cityTemps[hashResult]
				ct[0] += temp
				ct[1]++
				// note this might give you the wrong answer if the min or max is 0
				if temp < ct[2] || ct[2] == 0 {
					ct[2] = temp
				}
				if temp > ct[3] || ct[2] == 0 {
					ct[3] = temp
				}
				// Reset for the next line
				city = CityName{}
				cityLen = 0
				temp = 0
				isNegative = false
			case '.':
				continue // Skip the decimal point, we know all numbers have 1 decimal place
			case '-':
				isNegative = true // Next number will be negative
			default:
				if cityLen < 9 {
					city[cityLen] = b
					cityLen++
				} else {
					// Inline parsing of temperature
					temp = temp*10 + int64(b-'0')
				}
			}
		}
	}

	results <- cityTemps
}

func main() {
	if len(os.Args) < 2 {
		fmt.Println("Error: First arg should be the measurements.txt file path")
		return
	}

	filePath := os.Args[1]
	file, err := os.Open(filePath)
	if err != nil {
		fmt.Printf("Error opening file: %v\n", err)
		return
	}
	defer file.Close()

	numWorkers := 256
	rawChunks := make(chan []byte, 16384)     // buffered channel for raw chunks from the file
	validChunks := make(chan []byte, 16384)   // buffered channel for valid chunks ending with a complete line
	results := make(chan CityLUT, numWorkers) // buffered channel for results from the workers

	// Start the reader
	go reader(file, rawChunks)

	// Start the line supervisor
	go lineSupervisor(rawChunks, validChunks)

	var wg sync.WaitGroup

	// Start worker goroutines
	for i := 0; i < numWorkers; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			worker(validChunks, results)
		}()
	}

	// Collect results
	go func() {
		wg.Wait()
		close(results)
	}()

	finalResults := new([11072]CityTemp)
	for cityTemps := range results {
		for city, ct := range cityTemps {
			if ct[0] == 0 {
				continue
			}
			finalCt := finalResults[city]
			finalCt[0] += ct[0]
			finalCt[1] += ct[1]
			if ct[2] < finalCt[2] {
				finalCt[2] = ct[2]
			}
			if ct[3] > finalCt[3] {
				finalCt[3] = ct[3]
			}
			finalResults[city] = finalCt
		}
	}
}

@eddbot
Copy link

eddbot commented Jan 5, 2024

Hey @corlinp just wanted to say thanks for sharing your approach, learning a lot from the practial application of concurrency primatives here 🙌

@arhyth
Copy link

arhyth commented Jan 16, 2024

Thanks for sharing! I was looking for an implementation that was fast AND importantly, still approachable. Yours perfectly fit the bill. Kudos!

@arhyth
Copy link

arhyth commented Mar 4, 2024

Hey @corlinp! Not sure if you're as comfortable in Rust as in Go, but if by any chance you get to look at this port[1] of your implementation. I tried to make use of all the same "mechanisms" you used in yours but it's a bit slow, like 2 orders of magnitude slow. Would really love to know what makes your Go implementation tick. Thanks!

[1] https://github.com/arhyth/rubric/blob/main/src/main.rs

@corlinp
Copy link
Author

corlinp commented Mar 4, 2024

@arhyth honored that you wanted to replicate my approach! Unfortunately I'm not too familiar with Rust, so might be totally off base here, but you may want to take a look at using thread pools instead of so much async code? Good luck! :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment