Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save alexcrownus/d4168a76f6a61501bd6749b18a7e58e6 to your computer and use it in GitHub Desktop.
Save alexcrownus/d4168a76f6a61501bd6749b18a7e58e6 to your computer and use it in GitHub Desktop.
Cheap MapReduce in Go
package main
import (
"bufio"
"encoding/csv"
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
)
const (
MaxWorkers = 10
)
type Telemetry struct {
Request struct {
Sender string `json:"Sender,omitempty"`
Trigger string `json:"Trigger,omitempty"`
} `json:"Request,omitempty"`
App struct {
Program string `json:"Program,omitempty"`
Build string `json:"Build,omitempty"`
License string `json:"License,omitempty"`
Version string `json:"Version,omitempty"`
} `json:"App,omitempty"`
Connection struct {
Type string `json:"Type,omitempty"`
} `json:"Connection,omitempty"`
Region struct {
Continent string `json:"Continent,omitempty"`
Country string `json:"Country,omitempty"`
} `json:"Region,omitempty"`
Client struct {
OsVersion string `json:"OsVersion,omitempty"`
Language string `json:"Language,omitempty"`
Architecture string `json:"Architecture,omitempty"`
} `json:"Client,omitempty"`
}
func enumerateFiles(dirname string) chan interface{} {
output := make(chan interface{})
go func() {
filepath.Walk(dirname, func(path string, f os.FileInfo, err error) error {
if !f.IsDir() {
output <- path
}
return nil
})
close(output)
}()
return output
}
func enumerateJSON(filename string) chan string {
output := make(chan string)
go func() {
file, err := os.Open(filename)
if err != nil {
return
}
defer file.Close()
reader := bufio.NewReader(file)
for {
line, err := reader.ReadString('\n')
if err == io.EOF {
break
}
// ignore any meta comments on top of JSON file
if strings.HasPrefix(line, "#") == true {
continue
}
// add each json line to our enumeration channel
output <- line
}
close(output)
}()
return output
}
// MapperCollector is a channel that collects the output from mapper tasks
type MapperCollector chan chan interface{}
// MapperFunc is a function that performs the mapping part of the MapReduce job
type MapperFunc func(interface{}, chan interface{})
// ReducerFunc is a function that performs the reduce part of the MapReduce job
type ReducerFunc func(chan interface{}, chan interface{})
func mapperDispatcher(mapper MapperFunc, input chan interface{}, collector MapperCollector) {
for item := range input {
taskOutput := make(chan interface{})
go mapper(item, taskOutput)
collector <- taskOutput
}
close(collector)
}
func reducerDispatcher(collector MapperCollector, reducerInput chan interface{}) {
for output := range collector {
reducerInput <- <-output
}
close(reducerInput)
}
func mapper(filename interface{}, output chan interface{}) {
results := map[Telemetry]int{}
// start the enumeration of each JSON lines in the file
for line := range enumerateJSON(filename.(string)) {
// decode the telemetry JSON line
dec := json.NewDecoder(strings.NewReader(line))
var telemetry Telemetry
// if line cannot be JSON decoded then skip to next one
if err := dec.Decode(&telemetry); err == io.EOF {
continue
} else if err != nil {
continue
}
// stores Telemetry structure in the mapper results dictionary
previousCount, exists := results[telemetry]
if !exists {
results[telemetry] = 1
} else {
results[telemetry] = previousCount + 1
}
}
output <- results
}
func reducer(input chan interface{}, output chan interface{}) {
results := map[Telemetry]int{}
for matches := range input {
for key, value := range matches.(map[Telemetry]int) {
_, exists := results[key]
if !exists {
results[key] = value
} else {
results[key] = results[key] + value
}
}
}
output <- results
}
func mapReduce(mapper MapperFunc, reducer ReducerFunc, input chan interface{}) interface{} {
reducerInput := make(chan interface{})
reducerOutput := make(chan interface{})
mapperCollector := make(MapperCollector, MaxWorkers)
go reducer(reducerInput, reducerOutput)
go reducerDispatcher(mapperCollector, reducerInput)
go mapperDispatcher(mapper, input, mapperCollector)
return <-reducerOutput
}
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
fmt.Println("Processing. Please wait....")
// start the enumeration of files to be processed into a channel
input := enumerateFiles(".")
// this will start the map reduce work
results := mapReduce(mapper, reducer, input)
// open output file
f, err := os.Create("telemetry.csv")
if err != nil {
panic(err)
}
defer f.Close()
// make a write buffer
writer := csv.NewWriter(f)
for telemetry, value := range results.(map[Telemetry]int) {
var record []string
record = append(record, telemetry.Request.Sender)
record = append(record, telemetry.Request.Trigger)
record = append(record, telemetry.App.Program)
record = append(record, telemetry.App.Build)
record = append(record, telemetry.App.License)
record = append(record, telemetry.App.Version)
record = append(record, telemetry.Connection.Type)
record = append(record, telemetry.Region.Continent)
record = append(record, telemetry.Region.Country)
record = append(record, telemetry.Client.OsVersion)
record = append(record, telemetry.Client.Language)
record = append(record, telemetry.Client.Architecture)
// The last field of the CSV line is the aggregate count for each occurrence
record = append(record, strconv.Itoa(value))
writer.Write(record)
}
writer.Flush()
fmt.Println("Done!")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment