Last active
March 10, 2016 07:19
-
-
Save lateefj/9c94b01b17d344d84f30 to your computer and use it in GitHub Desktop.
Glow Map reduce
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"encoding/csv" | |
"io" | |
"log" | |
"os" | |
"path/filepath" | |
"strconv" | |
"strings" | |
"sync" | |
"github.com/chrislusf/glow/flow" | |
) | |
type YearFile struct { | |
Path string | |
Year int | |
} | |
type NameSize struct { | |
Year int | |
Name string | |
Size int | |
Gender string | |
} | |
var ( | |
yearFiles chan YearFile | |
nameSizes chan NameSize | |
) | |
func init() { | |
yearFiles = make(chan YearFile, 0) | |
nameSizes = make(chan NameSize, 1000) | |
} | |
func visit(path string, f os.FileInfo, err error) error { | |
yobi := strings.Index(path, "yob") + 3 | |
if yobi > 2 { | |
year, err := strconv.Atoi(path[yobi : yobi+4]) | |
if err != nil { | |
log.Println("FAIL couldn't parse int ") | |
return err | |
} | |
yearFiles <- YearFile{path, year} | |
} | |
return nil | |
} | |
func readYearFile(yf YearFile) { | |
f, err := os.Open(yf.Path) | |
if err != nil { | |
log.Printf("ERROR opening file %s", yf.Path) | |
return | |
} | |
r := csv.NewReader(f) | |
for { | |
fields, err := r.Read() | |
if err == io.EOF { | |
break | |
} else if err != nil { | |
log.Printf("Done error is %s", err) | |
return | |
} | |
name := fields[0] | |
g := fields[1] | |
s, err := strconv.Atoi(fields[2]) | |
if err != nil { | |
log.Printf("Could not parse %s error: %s", fields[2], err) | |
continue | |
} | |
nameSizes <- NameSize{Year: yf.Year, Name: name, Size: s, Gender: g} | |
} | |
} | |
func main() { | |
var wg sync.WaitGroup | |
go func() { | |
wg.Add(1) | |
defer close(yearFiles) | |
err := filepath.Walk("name_data", visit) | |
if err != nil { | |
log.Printf("Failed to walk path %s", err) | |
} | |
wg.Done() | |
}() | |
var fileWaitGroup sync.WaitGroup | |
for i := 0; i < 10; i++ { | |
fileWaitGroup.Add(1) | |
go func() { | |
defer fileWaitGroup.Done() | |
for yf := range yearFiles { | |
readYearFile(yf) | |
} | |
}() | |
} | |
go func() { | |
wg.Add(1) | |
flow.New().Channel(nameSizes).Map(func(ns NameSize) int { | |
return ns.Size | |
}).Reduce(func(x int, y int) int { | |
return x + y | |
}).Map(func(x int) { | |
log.Printf("Total Size: %d", x) | |
}).Run() | |
wg.Done() | |
}() | |
fileWaitGroup.Wait() | |
close(nameSizes) | |
wg.Wait() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment