Skip to content

Instantly share code, notes, and snippets.

@jaytaylor
Last active June 3, 2019 22:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jaytaylor/57799723734dd90e3a8510e0de1ba38f to your computer and use it in GitHub Desktop.
Save jaytaylor/57799723734dd90e3a8510e0de1ba38f to your computer and use it in GitHub Desktop.
High-performance multi-threaded MD5 hash calculator for multi-part file uploads to S3 / object storage.
package main
// Also see the Python version: https://gist.github.com/jaytaylor/76de9c99acbfac637e68f78809dbd27e
import (
"bytes"
"crypto/md5"
"encoding/base64"
"fmt"
"io"
"os"
"sync"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
var (
PartSize int
EmitBase64 bool
SingleThreaded bool
Quiet bool
Verbose bool
)
func init() {
rootCmd.PersistentFlags().IntVarP(&PartSize, "part-size", "s", 1073741824, "Segment size for individual parts, in bytes (default=1073741824, i.e. 1GB)")
rootCmd.PersistentFlags().BoolVarP(&EmitBase64, "base64", "b", false, "Display in base64 instead of hexadecimal")
rootCmd.PersistentFlags().BoolVarP(&Quiet, "single-threaded", "", false, "Use the single-threaded version (otherwise multi-threaded is enabled by default)")
rootCmd.PersistentFlags().BoolVarP(&Quiet, "quiet", "q", false, "Activate quiet log output")
rootCmd.PersistentFlags().BoolVarP(&Verbose, "verbose", "v", false, "Activate verbose log output")
}
func main() {
if err := rootCmd.Execute(); err != nil {
log.Fatal(err.Error())
}
}
var rootCmd = &cobra.Command{
Use: "md5sum_multi_part [filename]",
Short: "Multi-part MD5 file checksum hash calculator",
Long: "MD5 hash calculator for multi-part file uploads to AWS S3 and S3-compatible object storage services",
Args: cobra.MinimumNArgs(1),
PreRun: func(_ *cobra.Command, _ []string) {
initLogging()
},
RunE: func(cmd *cobra.Command, args []string) error {
var (
result string
err error
)
if SingleThreaded {
result, err = multiPartMD5SumST(args[0])
} else {
result, err = multiPartMD5SumMT(args[0])
}
if err != nil {
return err
}
fmt.Println(result)
return nil
},
}
// multiPartMD5SumMT is the multi-threaded version.
func multiPartMD5SumMT(filename string) (string, error) {
f, err := os.OpenFile(filename, os.O_RDONLY, os.FileMode(int(0644)))
if err != nil {
return "", fmt.Errorf("opening file %q: %s", filename, err)
}
defer func() {
log.Debug("closing file")
if err := f.Close(); err != nil {
log.Errorf("Unexpected issue closing file %q: %s", filename, err)
}
log.Debug("closed file")
}()
var (
hashes = [][]byte{}
wg sync.WaitGroup
mu sync.Mutex
)
for part := 1; ; part++ {
chunk := make([]byte, PartSize)
if n, err := f.Read(chunk); err == io.EOF {
log.Debug("EOF reached")
break
} else if err != nil {
return "", fmt.Errorf("read error: %s", err)
} else if n < PartSize {
// Resize the array to only the read data.
chunk = chunk[:n]
}
// Pre-allocate slice spot for upcoming hash value.
hashes = append(hashes, nil)
go func(chunk []byte, part int) {
log.Debugf("Received chunk of %v bytes for part=%v", len(chunk), part)
h := md5.New()
if _, err := h.Write(chunk); err != nil {
log.Fatalf("writing data to md5 hasher: %s", err)
}
sum := h.Sum(nil)
mu.Lock()
hashes[part-1] = sum
mu.Unlock()
log.Debugf("digest=%x part=%v", sum, part)
wg.Done()
}(chunk, part)
wg.Add(1)
}
log.Debug("Waiting for goroutines to complete..")
wg.Wait()
log.Debugf("Collected hashes for %v chunks", len(hashes))
mh := md5.New()
log.Debugf("Joined hash = %x", bytes.Join(hashes, nil))
mh.Write(bytes.Join(hashes, nil))
result := fmt.Sprintf("%x", mh.Sum(nil))
log.Debugf("Final hex digest = %s", result)
if EmitBase64 {
result = base64.StdEncoding.EncodeToString(mh.Sum(nil))
}
result = fmt.Sprintf("%v-%v", result, len(hashes))
return result, nil
}
// multiPartMD5SumMT is the single-threaded version.
func multiPartMD5SumST(filename string) (string, error) {
f, err := os.OpenFile(filename, os.O_RDONLY, os.FileMode(int(0644)))
if err != nil {
return "", fmt.Errorf("opening file %q: %s", filename, err)
}
defer func() {
log.Debug("closing file")
if err := f.Close(); err != nil {
log.Errorf("Unexpected issue closing file %q: %s", filename, err)
}
log.Debug("closed file")
}()
var (
chunk = make([]byte, PartSize)
h = md5.New()
hashes = [][]byte{}
)
for part := 1; ; part++ {
if n, err := f.Read(chunk); err == io.EOF {
log.Debug("EOF reached")
break
} else if err != nil {
return "", fmt.Errorf("read error: %s", err)
} else if n < PartSize {
// Resize the array to only the read data.
chunk = chunk[:n]
}
log.Debugf("Received chunk of %v bytes for part=%v", len(chunk), part)
h.Reset()
if _, err = h.Write(chunk); err != nil {
return "", fmt.Errorf("writing data to md5 hasher: %s", err)
}
sum := h.Sum(nil)
hashes = append(hashes, sum)
log.Debugf("digest=%x part=%v", sum, part)
}
log.Debugf("Collected hashes for %v chunks", len(hashes))
h.Reset()
log.Debugf("Joined hash = %x", bytes.Join(hashes, nil))
h.Write(bytes.Join(hashes, nil))
result := fmt.Sprintf("%x", h.Sum(nil))
log.Debugf("Final hex digest = %s", result)
if EmitBase64 {
result = base64.StdEncoding.EncodeToString(h.Sum(nil))
}
result = fmt.Sprintf("%v-%v", result, len(hashes))
return result, nil
}
func initLogging() {
level := log.InfoLevel
if Verbose {
level = log.DebugLevel
}
if Quiet {
level = log.ErrorLevel
}
log.SetLevel(level)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment