Skip to content

Instantly share code, notes, and snippets.

@homingli
Created June 19, 2017 01:16
Show Gist options
  • Save homingli/39bf9c0271feae8f2d45d87ace8268b2 to your computer and use it in GitHub Desktop.
Save homingli/39bf9c0271feae8f2d45d87ace8268b2 to your computer and use it in GitHub Desktop.
package main
import (
"flag"
"fmt"
"strings"
"log"
"os"
"time"
"encoding/json"
"strconv"
"net/url"
"io"
"bytes"
"compress/gzip"
"encoding/csv"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
)
type File struct {
Key string
Size int
MD5checksum string
}
type Tracker struct {
indexcounter,sizeTotal,objcount,incobjcount int
}
type FilterParams struct {
prefix,storageClass string
before,after time.Time
}
func getfiles_from_manifest(awsSession *session.Session, bucket string, key string) (string, []File) {
mbuff := s3dl(awsSession, bucket,key)
var objmap map[string]*json.RawMessage
err := json.Unmarshal(mbuff.Bytes(), &objmap)
if err != nil { log.Fatal(err) }
var files []File
err = json.Unmarshal(*objmap["files"], &files)
var headers string
err = json.Unmarshal(*objmap["fileSchema"], &headers)
//fmt.Println(strings.Split(headers,","))
//for _,f := range files { fmt.Fprintf(os.Stderr,"% v\n", f) }
return headers, files
}
func process_csv(awsSession *session.Session,bucket string,key string,t *Tracker, buff *bytes.Buffer) {
gzbuff := s3dl(awsSession, bucket,key)
zr,err := gzip.NewReader(bytes.NewReader(gzbuff.Bytes()))
if err != nil { log.Fatal(err) }
defer zr.Close()
csvr := csv.NewReader(zr)
//fmt.Printf("Name: %s\nComment: %s\nModTime: %s\n\n", zr.Name, zr.Comment, zr.ModTime.UTC())
for {
line, err := csvr.Read()
if err == io.EOF { break }
t.objcount+=1
// apply filter
if include_or_not(line) {
t.incobjcount += 1
buff.WriteString(strings.Join([]string{line[0],line[1],line[2]},",")) //bucket,key,size
buff.WriteRune('\n')
objsize,err := strconv.Atoi(line[2]) // size
if err!=nil { log.Fatal(err) }
t.sizeTotal+=objsize
if t.sizeTotal > 4*1024*1024*1024 {
fmt.Println("threshold crossed. current size "+strconv.Itoa(t.sizeTotal))
_ = s3ul(awsSession, "hml-oregon-bucket", "s3grouper-output/manifest."+strconv.Itoa(t.indexcounter)+".index", aws.NewWriteAtBuffer(buff.Bytes()))
t.indexcounter+=1
buff.Reset()
t.sizeTotal=0
}
}
}
return
}
func s3dl(awsSession *session.Session, bucket string, key string) *aws.WriteAtBuffer {
fmt.Fprintf(os.Stderr, "attempting to download s3://%v/%v\n", bucket, key)
start := time.Now()
buff := &aws.WriteAtBuffer{}
s3dl := s3manager.NewDownloader(awsSession)
n, err := s3dl.Download(buff, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
if err!=nil { log.Fatal(err) }
fmt.Fprintf(os.Stderr, "downloaded %v bytes in %v\n", n, time.Now().Sub(start))
return buff
}
func s3ul(awsSession *session.Session, bucket string, key string, data *aws.WriteAtBuffer) *s3manager.UploadOutput {
fmt.Fprintf(os.Stderr, "attempting to upload s3://%v/%v\n", bucket, key)
start := time.Now()
s3ul := s3manager.NewUploader(awsSession)
result, err := s3ul.Upload(&s3manager.UploadInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
Body: bytes.NewReader(data.Bytes()),
})
if err!=nil { log.Fatal(err) }
fmt.Fprintf(os.Stderr, "wrote to s3://%v/%v in %v\n", bucket,key,time.Now().Sub(start))
return result
}
func include_or_not(record []string) bool {
// processing: apply rules to include (or exclude)
//if record[5] == "GLACIER" {
if record[5] == "STANDARD" {
return true
} else { return false }
}
func s3uri(uri *string) (string, string) {
if !strings.HasPrefix(*uri, "s3") || !strings.HasSuffix(*uri,"manifest.json") {
log.Fatal("ERROR: malformed s3uri. Please use s3://path/to/manifest.json format.")
}
u, err := url.Parse(*uri);
if err != nil { log.Fatal(err) }
return u.Host, u.Path
}
func main() {
// cli arguments
var outbucket,manifest string
var debug bool
var source map[string]string
source = make(map[string]string)
flag.StringVar(&manifest, "m", "", "s3 path to inventory manifest")
flag.StringVar(&outbucket, "b", "", "index files output bucket")
flag.BoolVar(&debug, "debug", false, "show aws sdk debug output")
flag.Parse()
if len(manifest) == 0 {
flag.Usage()
log.Fatal("ERROR: need manfiest defined.")
} else {
source["bucket"],source["key"] = s3uri(&manifest)
}
if len(outbucket) == 0 {
outbucket="hml-oregon-bucket"
}
cfg := aws.NewConfig()
if debug {
log := log.New(os.Stderr, "", log.LstdFlags)
cfg = cfg.WithLogger(
aws.LoggerFunc(func(args ...interface{}) { log.Println(args) }),
).WithLogLevel(aws.LogDebugWithSigning)
}
awsSession := session.Must(session.NewSessionWithOptions(session.Options{
SharedConfigState: session.SharedConfigEnable,
}))
// process manifest
_, files := getfiles_from_manifest(awsSession, source["bucket"], source["key"])
// process csv
var t Tracker
var indexbuffer bytes.Buffer
t.indexcounter = 1
for _,f := range files {
process_csv(awsSession,source["bucket"],f.Key,&t,&indexbuffer)
}
//upload last batch
if !bytes.Equal(indexbuffer.Bytes(),nil) { // isEmpty?
fmt.Fprintln(os.Stderr,"last batch, current size "+strconv.Itoa(t.sizeTotal))
_ = s3ul(awsSession, outbucket, "s3grouper-output/manifest."+strconv.Itoa(t.indexcounter)+".index", aws.NewWriteAtBuffer(indexbuffer.Bytes()))
}
fmt.Fprintf(os.Stderr,"%v objects processed. %v objects included.",t.objcount,t.incobjcount)
/*
if _, err := io.Copy(os.Stdout, zr); err != nil {
log.Fatal(err)
}
*/
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment