Skip to content

Instantly share code, notes, and snippets.

@cnnrznn
Created May 5, 2020 13:10
Show Gist options
  • Save cnnrznn/d51e31fd526bf7b124157cf7130dfd8e to your computer and use it in GitHub Desktop.
Save cnnrznn/d51e31fd526bf7b124157cf7130dfd8e to your computer and use it in GitHub Desktop.
An MR job using csv input/output
package main
import (
"fmt"
"github.com/cnnrznn/gomr"
"os"
"runtime"
"strings"
"sync"
)
type CsvJob struct{}
type Count struct {
key string
val int
}
func (j *CsvJob) Map(in <-chan interface{}, out chan<- interface{}) {
defer close(out)
counts := make(map[string]int)
for item := range in {
for _, word := range strings.Split(item.(string), ",") {
counts[word]++
}
}
for k, v := range counts {
out <- Count{k, v}
}
}
func (j *CsvJob) Partition(in <-chan interface{}, outs []chan interface{}, wg *sync.WaitGroup) {
defer wg.Done()
for item := range in {
word := item.(Count).key
key := 0
if len(word) > 0 {
key = int(word[0])
}
outs[key%len(outs)] <- item
}
}
func (j *CsvJob) Reduce(in <-chan interface{}, out chan<- interface{}, wg *sync.WaitGroup) {
defer wg.Done()
counts := make(map[string]int)
for item := range in {
ct := item.(Count)
counts[ct.key] += ct.val
}
for k, v := range counts {
out <- Count{k, v}
}
}
func main() {
job := &CsvJob{}
p := runtime.NumCPU()
ins, out := gomr.RunLocal(p, p, job)
gomr.TextFileParallel(os.Args[1], ins)
for item := range out {
c := item.(Count)
fmt.Printf("%v,%v\n", c.key, c.val)
}
}
a,file,with,commas
is,more,powerful,
than ,a,file,without,
a,a,a,a,file,with,is,not ,
without
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment