Created
April 2, 2014 07:13
-
-
Save sudix/9929302 to your computer and use it in GitHub Desktop.
Bounded parallelism [Go Concurrency Patterns: Pipelines and cancellation - The Go Blog](http://blog.golang.org/pipelines)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"crypto/md5" | |
"errors" | |
"fmt" | |
"io/ioutil" | |
"os" | |
"path/filepath" | |
"sort" | |
"sync" | |
) | |
type result struct { | |
path string | |
sum [md5.Size]byte | |
err error | |
} | |
func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) { | |
paths := make(chan string) | |
errc := make(chan error, 1) | |
go func() { | |
// Close the paths channel after Walk returns. | |
defer close(paths) | |
// No select needed for this send, since errc is buffered. | |
errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error { | |
if err != nil { | |
return err | |
} | |
if info.IsDir() { | |
return nil | |
} | |
select { | |
case paths <- path: | |
case <-done: | |
return errors.New("walk canceled") | |
} | |
return nil | |
}) | |
}() | |
return paths, errc | |
} | |
func digester(done <-chan struct{}, paths <-chan string, c chan<- result) { | |
for path := range paths { | |
data, err := ioutil.ReadFile(path) | |
select { | |
case c <- result{path, md5.Sum(data), err}: | |
case <-done: | |
return | |
} | |
} | |
} | |
func MD5All(root string) (map[string][md5.Size]byte, error) { | |
// MD5All closes the done channel when it returns; it may do so before | |
// receiving all the values from c and errc. | |
done := make(chan struct{}) | |
defer close(done) | |
paths, errc := walkFiles(done, root) | |
// Start a fixed number of goroutines to read and digest files. | |
c := make(chan result) | |
var wg sync.WaitGroup | |
const numDigesters = 20 | |
wg.Add(numDigesters) | |
for i := 0; i < numDigesters; i++ { | |
go func() { | |
digester(done, paths, c) | |
wg.Done() | |
}() | |
} | |
go func() { | |
wg.Wait() | |
close(c) | |
}() | |
m := make(map[string][md5.Size]byte) | |
for r := range c { | |
if r.err != nil { | |
return nil, r.err | |
} | |
m[r.path] = r.sum | |
} | |
// Check whether the Walk failed. | |
if err := <-errc; err != nil { | |
return nil, err | |
} | |
return m, nil | |
} | |
func main() { | |
// Calculate the MD5 sum of all files under the specified directory, | |
// then print the results sorted by path name. | |
m, err := MD5All(os.Args[1]) | |
if err != nil { | |
fmt.Println(err) | |
return | |
} | |
var paths []string | |
for path := range m { | |
paths = append(paths, path) | |
} | |
sort.Strings(paths) | |
for _, path := range paths { | |
fmt.Printf("%x %s\n", m[path], path) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Could you give me an example when running into
return errors.New("walk canceled")
?