Last active
December 16, 2015 20:49
-
-
Save dnuffer/5495516 to your computer and use it in GitHub Desktop.
Map Reduce in GO
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// This implementation is missing the shuffle step where the keys from the map | |
// step are sorted and aggregated before reducing. The reason is that the builtin | |
// map class has no ability to customize the comparison so we can't write a | |
// generic version that uses interface{} variables. | |
func MapReduce(mapper func(in_data Pair) Pair, | |
reducer func(input chan Pair, output chan interface{}), | |
input chan Pair, | |
pool_size int) []interface{} { | |
// written to by mapper goroutines, read by reducer | |
reducer_input := make(chan Pair, pool_size) | |
// written to by reducer, read by MapReduce for return value | |
reducer_output := make(chan interface{}) | |
// written to by mapper goroutines, read by await mapper completion goroutine | |
// so it knows when it is safe to close reducer_input | |
done := make(chan struct{}, pool_size) | |
// reducer goroutine simply reads from input and writes result to output | |
go func() { | |
reducer(reducer_input, reducer_output) | |
close(reducer_output) | |
}() | |
// await mapper completion goroutine. once all the mapper workers are done, | |
// close reducer_input | |
go func() { | |
for i := 0; i < pool_size; i++ { | |
<-done | |
} | |
close(reducer_input) | |
}() | |
// mapper goroutines | |
for i := 0; i < pool_size; i++ { | |
go func() { | |
for item := range input { | |
reducer_input <- mapper(item) | |
} | |
done <- struct{}{} | |
}() | |
} | |
result := []interface{}{} | |
for item := range reducer_output { | |
result = append(result, item) | |
} | |
return result | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment