Skip to content

Instantly share code, notes, and snippets.

@dzungtran
Last active November 14, 2023 06:35
Show Gist options
  • Star 12 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dzungtran/36a3156f03d4554a6aacf81680591425 to your computer and use it in GitHub Desktop.
Save dzungtran/36a3156f03d4554a6aacf81680591425 to your computer and use it in GitHub Desktop.
[Golang] Read large csv file with worker pool
package main
// Sample file for test: https://drive.google.com/file/d/1DFkJdX5UTnB_xL7g8xwkkdE8BxdurAhN/view?usp=sharing
import (
"encoding/csv"
"encoding/json"
"fmt"
"io"
"os"
"strconv"
"sync"
"time"
)
type Sales struct {
Region string `json:"region"`
Country string `json:"country"`
ItemType string `json:"item_type"`
SaleChannel string `json:"sale_channel"`
OrderPriority string `json:"order_priority"`
OrderDate string `json:"order_date"`
OrderId int64 `json:"order_id"`
ShipDate string `json:"ship_date"`
UnitSold int64 `json:"unit_sold"`
UnitPrice float64 `json:"unit_price"`
UnitCost float64 `json:"unit_cost"`
TotalRevenue float64 `json:"total_revenue"`
TotalCost float64 `json:"total_cost"`
TotalProfit float64 `json:"total_profit"`
}
var mu sync.Mutex
func main() {
f, _ := os.Open("/path/to/file/1000k.csv")
f1, _ := os.Open("/path/to/file/1000k.csv")
defer f.Close()
defer f1.Close()
ts := time.Now()
//basicRead(f)
basicRS(f)
te := time.Now().Sub(ts)
ts1 := time.Now()
//concuRead(f1)
//concuRS(f1)
concuRSwWP(f1)
te1 := time.Now().Sub(ts1)
// Read and Set to a map
fmt.Println("\nEND Basic: ", te)
fmt.Println("END Concu: ", te1)
}
// with Worker pools
func concuRSwWP(f *os.File) {
fcsv := csv.NewReader(f)
rs := make([]*Sales, 0)
numWps := 100
jobs := make(chan []string, numWps)
res := make(chan *Sales)
var wg sync.WaitGroup
worker := func(jobs <-chan []string, results chan<- *Sales) {
for {
select {
case job, ok := <-jobs: // you must check for readable state of the channel.
if !ok {
return
}
results <- parseStruct(job)
}
}
}
// init workers
for w:=0; w < numWps; w++ {
wg.Add(1)
go func() {
// this line will exec when chan `res` processed output at line 107 (func worker: line 71)
defer wg.Done()
worker(jobs, res)
}()
}
go func() {
for {
rStr, err := fcsv.Read()
if err == io.EOF {
break
}
if err != nil {
fmt.Println("ERROR: ", err.Error())
break
}
jobs <- rStr
}
close(jobs) // close jobs to signal workers that no more job are incoming.
}()
go func() {
wg.Wait()
close(res) // when you close(res) it breaks the below loop.
}()
for r := range res {
rs = append(rs, r)
}
fmt.Println("Count Concu ", len(rs))
}
func concuRS(f *os.File) {
fcsv := csv.NewReader(f)
rs := make(map[int64]*Sales)
var wg sync.WaitGroup
for {
rStr, err := fcsv.Read()
if err == io.EOF {
break
}
if err != nil {
fmt.Println("ERROR: ", err.Error())
break
}
wg.Add(1)
go func(pwg *sync.WaitGroup) {
defer pwg.Done()
obj := parseStruct(rStr)
mu.Lock()
rs[obj.OrderId] = obj
mu.Unlock()
}(&wg)
}
wg.Wait()
fmt.Println("Count Concu ", len(rs))
}
func basicRS(f *os.File) {
fcsv := csv.NewReader(f)
rs := make([]*Sales, 0)
for {
rStr, err := fcsv.Read()
if err == io.EOF {
break
}
if err != nil {
fmt.Println("ERROR: ", err.Error())
break
}
rs = append(rs, parseStruct(rStr))
}
fmt.Println("Count Basic ", len(rs))
}
func basicRead(f *os.File) {
fcsv := csv.NewReader(f)
for {
rStr, err := fcsv.Read()
if err == io.EOF {
break
}
if err != nil {
fmt.Println("ERROR: ", err.Error())
break
}
printData(rStr, "BS")
}
}
func concuRead(f *os.File) {
fcsv := csv.NewReader(f)
for {
rStr, err := fcsv.Read()
if err == io.EOF {
break
}
if err != nil {
fmt.Println("ERROR: ", err.Error())
break
}
go printData(rStr, "CC")
}
}
func printData(data []string, job string) {
obj := parseStruct(data)
js, _ := json.Marshal(obj)
fmt.Printf("\n[%v] ROW Id: %v - len %v", job, obj.OrderId, len(js))
}
func parseStruct(data []string) *Sales {
id, _ := strconv.ParseInt(data[6], 10, 64)
unitSold, _ := strconv.ParseInt(data[8], 10, 64)
unitPrice, _ := strconv.ParseFloat(data[9], 64)
unitCost, _ := strconv.ParseFloat(data[10], 64)
totalRev, _ := strconv.ParseFloat(data[11], 64)
totalCost, _ := strconv.ParseFloat(data[12], 64)
totalProfit, _ := strconv.ParseFloat(data[13], 64)
return &Sales{
Region: data[0],
Country: data[1],
ItemType: data[2],
SaleChannel: data[3],
OrderPriority: data[4],
OrderDate: data[5],
OrderId: id,
ShipDate: data[7],
UnitSold: unitSold,
UnitPrice: unitPrice,
UnitCost: unitCost,
TotalRevenue: totalRev,
TotalCost: totalCost,
TotalProfit: totalProfit,
}
}
@actforgood
Copy link

For anyone interested in the subject, you can also check https://github.com/actforgood/bigcsvreader .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment