Skip to content

Instantly share code, notes, and snippets.

@djui
Last active March 2, 2016 16:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save djui/05bc8720a4b3cfa1059b to your computer and use it in GitHub Desktop.
Save djui/05bc8720a4b3cfa1059b to your computer and use it in GitHub Desktop.
Takes a list of S3 URLs and applies a function on each file
package main
import (
"bufio"
"fmt"
"io/ioutil"
"net/url"
"os"
"strings"
"sync"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
)
const concurrency = 64
func ofInterest(f *os.File) bool {
c, err := ioutil.ReadAll(f)
if err != nil {
fmt.Fprintln(os.Stderr, "Failed to read file:", err)
return false
}
return strings.Contains(string(c), "foo")
}
func main() {
if len(os.Args) != 2 {
fmt.Fprintln(os.Stderr, "Usage: foreach FILELIST")
}
filelist := os.Args[1]
r, err := os.Open(filelist)
if err != nil {
fmt.Fprintln(os.Stderr, err)
}
s := session.New(&aws.Config{Region: aws.String("eu-west-1")})
downloader := s3manager.NewDownloader(s)
// Concurrency
sem := make(chan bool, concurrency)
wg := sync.WaitGroup{}
scanner := bufio.NewScanner(r)
for scanner.Scan() {
fmt.Fprint(os.Stderr, ".")
line := scanner.Text()
// Concurrency
wg.Add(1)
go func(line string) {
// Concurrency
sem <- true
defer func() {
<-sem
wg.Done()
}()
t, err := ioutil.TempFile("", "foreach_")
if err != nil {
fmt.Fprintln(os.Stderr, "Failed to create tempfile", err)
return
}
defer func() { n := t.Name(); t.Close(); os.Remove(n) }()
bucket, key, err := bucketKeyFromURL(line)
i := &s3.GetObjectInput{Bucket: aws.String(bucket), Key: aws.String(key)}
// fmt.Fprintln(os.Stderr, "Downloading", bucket, key, "to", t.Name())
_, err = downloader.Download(t, i)
if err != nil {
if strings.HasPrefix(err.Error(), "NoSuchKey") {
fmt.Fprint(os.Stderr, ":")
} else if strings.HasPrefix(err.Error(), "InvalidParameter") {
fmt.Fprint(os.Stderr, ";")
} else {
fmt.Fprintln(os.Stderr, "Failed to download file", err)
}
return
}
_, err = t.Seek(0, 0)
if err != nil {
fmt.Fprintln(os.Stderr, "Failed to rewind tempfile:", err)
return
}
if ofInterest(t) {
fmt.Println(line)
}
}(line)
}
wg.Wait()
if err := scanner.Err(); err != nil {
fmt.Fprintln(os.Stderr, err)
}
}
func bucketKeyFromURL(urlString string) (string, string, error) {
u, err := url.Parse(urlString)
if err != nil {
return "", "", err
}
return u.Host, u.Path, nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment