Skip to content

Instantly share code, notes, and snippets.

@lateefj
Last active March 10, 2016 07:19
Show Gist options
  • Save lateefj/9c94b01b17d344d84f30 to your computer and use it in GitHub Desktop.
Save lateefj/9c94b01b17d344d84f30 to your computer and use it in GitHub Desktop.
Glow Map reduce
package main
import (
"encoding/csv"
"io"
"log"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"github.com/chrislusf/glow/flow"
)
type YearFile struct {
Path string
Year int
}
type NameSize struct {
Year int
Name string
Size int
Gender string
}
var (
yearFiles chan YearFile
nameSizes chan NameSize
)
func init() {
yearFiles = make(chan YearFile, 0)
nameSizes = make(chan NameSize, 1000)
}
func visit(path string, f os.FileInfo, err error) error {
yobi := strings.Index(path, "yob") + 3
if yobi > 2 {
year, err := strconv.Atoi(path[yobi : yobi+4])
if err != nil {
log.Println("FAIL couldn't parse int ")
return err
}
yearFiles <- YearFile{path, year}
}
return nil
}
func readYearFile(yf YearFile) {
f, err := os.Open(yf.Path)
if err != nil {
log.Printf("ERROR opening file %s", yf.Path)
return
}
r := csv.NewReader(f)
for {
fields, err := r.Read()
if err == io.EOF {
break
} else if err != nil {
log.Printf("Done error is %s", err)
return
}
name := fields[0]
g := fields[1]
s, err := strconv.Atoi(fields[2])
if err != nil {
log.Printf("Could not parse %s error: %s", fields[2], err)
continue
}
nameSizes <- NameSize{Year: yf.Year, Name: name, Size: s, Gender: g}
}
}
func main() {
var wg sync.WaitGroup
go func() {
wg.Add(1)
defer close(yearFiles)
err := filepath.Walk("name_data", visit)
if err != nil {
log.Printf("Failed to walk path %s", err)
}
wg.Done()
}()
var fileWaitGroup sync.WaitGroup
for i := 0; i < 10; i++ {
fileWaitGroup.Add(1)
go func() {
defer fileWaitGroup.Done()
for yf := range yearFiles {
readYearFile(yf)
}
}()
}
go func() {
wg.Add(1)
flow.New().Channel(nameSizes).Map(func(ns NameSize) int {
return ns.Size
}).Reduce(func(x int, y int) int {
return x + y
}).Map(func(x int) {
log.Printf("Total Size: %d", x)
}).Run()
wg.Done()
}()
fileWaitGroup.Wait()
close(nameSizes)
wg.Wait()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment