Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
Cheap MapReduce in Go
package main
import (
const (
MaxWorkers = 10
type Telemetry struct {
Request struct {
Sender string `json:"Sender,omitempty"`
Trigger string `json:"Trigger,omitempty"`
} `json:"Request,omitempty"`
App struct {
Program string `json:"Program,omitempty"`
Build string `json:"Build,omitempty"`
License string `json:"License,omitempty"`
Version string `json:"Version,omitempty"`
} `json:"App,omitempty"`
Connection struct {
Type string `json:"Type,omitempty"`
} `json:"Connection,omitempty"`
Region struct {
Continent string `json:"Continent,omitempty"`
Country string `json:"Country,omitempty"`
} `json:"Region,omitempty"`
Client struct {
OsVersion string `json:"OsVersion,omitempty"`
Language string `json:"Language,omitempty"`
Architecture string `json:"Architecture,omitempty"`
} `json:"Client,omitempty"`
func enumerateFiles(dirname string) chan interface{} {
output := make(chan interface{})
go func() {
filepath.Walk(dirname, func(path string, f os.FileInfo, err error) error {
if !f.IsDir() {
output <- path
return nil
return output
func enumerateJSON(filename string) chan string {
output := make(chan string)
go func() {
file, err := os.Open(filename)
if err != nil {
defer file.Close()
reader := bufio.NewReader(file)
for {
line, err := reader.ReadString('\n')
if err == io.EOF {
// ignore any meta comments on top of JSON file
if strings.HasPrefix(line, "#") == true {
// add each json line to our enumeration channel
output <- line
return output
// MapperCollector is a channel that collects the output from mapper tasks
type MapperCollector chan chan interface{}
// MapperFunc is a function that performs the mapping part of the MapReduce job
type MapperFunc func(interface{}, chan interface{})
// ReducerFunc is a function that performs the reduce part of the MapReduce job
type ReducerFunc func(chan interface{}, chan interface{})
func mapperDispatcher(mapper MapperFunc, input chan interface{}, collector MapperCollector) {
for item := range input {
taskOutput := make(chan interface{})
go mapper(item, taskOutput)
collector <- taskOutput
func reducerDispatcher(collector MapperCollector, reducerInput chan interface{}) {
for output := range collector {
reducerInput <- <-output
func mapper(filename interface{}, output chan interface{}) {
results := map[Telemetry]int{}
// start the enumeration of each JSON lines in the file
for line := range enumerateJSON(filename.(string)) {
// decode the telemetry JSON line
dec := json.NewDecoder(strings.NewReader(line))
var telemetry Telemetry
// if line cannot be JSON decoded then skip to next one
if err := dec.Decode(&telemetry); err == io.EOF {
} else if err != nil {
// stores Telemetry structure in the mapper results dictionary
previousCount, exists := results[telemetry]
if !exists {
results[telemetry] = 1
} else {
results[telemetry] = previousCount + 1
output <- results
func reducer(input chan interface{}, output chan interface{}) {
results := map[Telemetry]int{}
for matches := range input {
for key, value := range matches.(map[Telemetry]int) {
_, exists := results[key]
if !exists {
results[key] = value
} else {
results[key] = results[key] + value
output <- results
func mapReduce(mapper MapperFunc, reducer ReducerFunc, input chan interface{}) interface{} {
reducerInput := make(chan interface{})
reducerOutput := make(chan interface{})
mapperCollector := make(MapperCollector, MaxWorkers)
go reducer(reducerInput, reducerOutput)
go reducerDispatcher(mapperCollector, reducerInput)
go mapperDispatcher(mapper, input, mapperCollector)
return <-reducerOutput
func main() {
fmt.Println("Processing. Please wait....")
// start the enumeration of files to be processed into a channel
input := enumerateFiles(".")
// this will start the map reduce work
results := mapReduce(mapper, reducer, input)
// open output file
f, err := os.Create("telemetry.csv")
if err != nil {
defer f.Close()
// make a write buffer
writer := csv.NewWriter(f)
for telemetry, value := range results.(map[Telemetry]int) {
var record []string
record = append(record, telemetry.Request.Sender)
record = append(record, telemetry.Request.Trigger)
record = append(record, telemetry.App.Program)
record = append(record, telemetry.App.Build)
record = append(record, telemetry.App.License)
record = append(record, telemetry.App.Version)
record = append(record, telemetry.Connection.Type)
record = append(record, telemetry.Region.Continent)
record = append(record, telemetry.Region.Country)
record = append(record, telemetry.Client.OsVersion)
record = append(record, telemetry.Client.Language)
record = append(record, telemetry.Client.Architecture)
// The last field of the CSV line is the aggregate count for each occurrence
record = append(record, strconv.Itoa(value))

This comment has been minimized.

Copy link

@kisamoto kisamoto commented Jul 14, 2015

Just FYI, you can simplify your map increments and addition:

    // stores Telemetry structure in the mapper results dictionary


            results += value

By default it uses the initialisation value (0) so just add whatever to your key.


This comment has been minimized.

Copy link

@stgleb stgleb commented Sep 10, 2016

What is the reason to create map


So value type is int{}, but not just int?


This comment has been minimized.

Copy link

@Jared-Prime Jared-Prime commented Jan 6, 2017

@mcastilho I'm trying to understand something about mapperDispatcher -- it seems as if the dispatcher will block on collector <- taskOutput within that loop, making the execution in mapper effectively sequential rather than parallel. Am I misunderstanding? I would have thought that we'd want that step to be fully parallelized as well.


This comment has been minimized.

Copy link

@chrislusf chrislusf commented Jan 9, 2017

Welcome to try . You can just use the standalone version, and I believe the code would be much simpler.


This comment has been minimized.

Copy link

@night-codes night-codes commented Jan 25, 2017

@stgleb nope, value type is int.

results := map[Telemetry]int{}

equivalent to

results := make(map[Telemetry]int)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment