Created
January 20, 2014 20:21
-
-
Save calebdoxsey/8528286 to your computer and use it in GitHub Desktop.
download usage data
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"github.com/mitchellh/goamz/aws" | |
"github.com/mitchellh/goamz/s3" | |
"io" | |
"log" | |
"os" | |
"strings" | |
"sync" | |
) | |
var ( | |
s3Prefix = "audit/twitter/prod/twitter/track/2013/11" | |
auth *aws.Auth | |
client *s3.S3 | |
bucket *s3.Bucket | |
) | |
const ( | |
WORKERS = 200 | |
) | |
func init() { | |
auth, err := aws.GetAuth("ACCESS_KEY_ID", "SECRET_ACCESS_KEY") | |
if err != nil { | |
panic(err) | |
} | |
client = s3.New(auth, aws.USEast) | |
bucket = client.Bucket("archive.hosey") | |
} | |
func keyProducer(dayHour string, keyChannel chan string) error { | |
marker := "" | |
for { | |
resp, err := bucket.List(s3Prefix+"/"+dayHour, "", marker, 1000) | |
if err != nil { | |
return err | |
} | |
for _, key := range resp.Contents { | |
if strings.Contains(key.Key, "netbase") { | |
keyChannel <- key.Key | |
} | |
} | |
if !resp.IsTruncated { | |
break | |
} | |
marker = resp.Contents[len(resp.Contents)-1].Key | |
} | |
return nil | |
} | |
func keyConsumer(keyChannel chan string) { | |
for key := range keyChannel { | |
downloadFile(key) | |
} | |
} | |
func dayHourProducer(dayHourChannel chan string) { | |
for i := 1; i <= 30; i++ { | |
for j := 0; j < 24; j++ { | |
dayHour := fmt.Sprintf("%02d/%02d", i, j) | |
dayHourChannel <- dayHour | |
} | |
} | |
close(dayHourChannel) | |
} | |
func dayHourConsumer(dayHourChannel, keyChannel chan string) { | |
for dayHour := range dayHourChannel { | |
log.Println("Getting files for", dayHour) | |
keyProducer(dayHour, keyChannel) | |
} | |
} | |
func downloadFile(key string) error { | |
filePath := "data/" + strings.Replace(key, "/", "_", -1) | |
if _, err := os.Stat(filePath); err == nil { | |
return nil | |
} | |
log.Println("Downloading", key) | |
file, err := os.Create(filePath) | |
if err != nil { | |
return err | |
} | |
defer file.Close() | |
reader, err := bucket.GetReader(key) | |
if err != nil { | |
return err | |
} | |
defer reader.Close() | |
_, err = io.Copy(file, reader) | |
return err | |
} | |
func main() { | |
os.Mkdir("data", 0777) | |
dayHourChannel := make(chan string) | |
keyChannel := make(chan string) | |
go dayHourProducer(dayHourChannel) | |
var wg sync.WaitGroup | |
for i := 0; i < WORKERS; i++ { | |
wg.Add(1) | |
go func() { | |
dayHourConsumer(dayHourChannel, keyChannel) | |
wg.Done() | |
}() | |
} | |
go func() { | |
wg.Wait() | |
close(keyChannel) | |
}() | |
var wg2 sync.WaitGroup | |
for i := 0; i < WORKERS; i++ { | |
wg2.Add(1) | |
go func() { | |
keyConsumer(keyChannel) | |
wg.Done() | |
}() | |
} | |
wg.Wait() | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bufio" | |
"compress/gzip" | |
"fmt" | |
"os" | |
"path/filepath" | |
"strconv" | |
"strings" | |
"github.com/jmhodges/levigo" | |
) | |
type ( | |
nothing struct{} | |
Counts struct { | |
day string | |
allDev, uniqueDev, allProd, uniqueProd int | |
} | |
) | |
func WithFiles(day string, f func(string, bool)) { | |
filepath.Walk("data", func(p string, info os.FileInfo, err error) error { | |
if info.IsDir() { | |
return nil | |
} | |
if strings.Contains(p, "2013_11_"+day) { | |
fmt.Println("Processing", p) | |
f(p, strings.Contains(p, "--dev")) | |
} | |
return nil | |
}) | |
} | |
func WithLines(p string, callback func(string)) { | |
f, err := os.Open(p) | |
if err != nil { | |
panic(err) | |
} | |
defer f.Close() | |
rdr := bufio.NewScanner(f) | |
for rdr.Scan() { | |
i := strings.IndexByte(rdr.Text(), ',') | |
if i > 0 { | |
callback(rdr.Text()[:i]) | |
} | |
} | |
} | |
func GetDiskCounts(day string) (int, int) { | |
opts := levigo.NewOptions() | |
opts.SetCreateIfMissing(true) | |
wo := levigo.NewWriteOptions() | |
ro := levigo.NewReadOptions() | |
devSize := 0 | |
dev, err := levigo.Open("kv-dev", opts) | |
if err != nil { | |
panic(err) | |
} | |
defer dev.Close() | |
defer os.RemoveAll("kv-dev") | |
prodSize := 0 | |
prod, err := levigo.Open("kv-prod", opts) | |
if err != nil { | |
panic(err) | |
} | |
defer prod.Close() | |
defer os.RemoveAll("kv-prod") | |
WithFiles(day, func(p string, isDev bool) { | |
WithLines(p, func(id string) { | |
key := []byte(id) | |
if isDev { | |
v, err := dev.Get(ro, key) | |
if err != nil { | |
panic(err) | |
} | |
if v == nil { | |
dev.Put(wo, key, []byte{}) | |
devSize++ | |
if devSize%100000 == 0 { | |
fmt.Print(".") | |
} | |
} | |
} else { | |
v, err := prod.Get(ro, key) | |
if err != nil { | |
panic(err) | |
} | |
if v == nil { | |
prod.Put(wo, key, []byte{}) | |
prodSize++ | |
if prodSize%100000 == 0 { | |
fmt.Print(".") | |
} | |
} | |
} | |
}) | |
}) | |
return devSize, prodSize | |
} | |
func GetInMemoryCounts(day string) Counts { | |
counts := Counts{day: day} | |
dev := map[int64]nothing{} | |
prod := map[int64]nothing{} | |
WithFiles(day, func(p string, isDev bool) { | |
WithLines(p, func(id string) { | |
i, _ := strconv.ParseInt(id, 10, 64) | |
if isDev { | |
counts.allDev++ | |
dev[i] = nothing{} | |
} else { | |
counts.allProd++ | |
prod[i] = nothing{} | |
} | |
}) | |
}) | |
counts.uniqueDev = len(dev) | |
counts.uniqueProd = len(prod) | |
return counts | |
} | |
func GetCounts() { | |
totalDev := 0 | |
totalProd := 0 | |
fmt.Println("day,all dev,unique dev,all prod,unique prod") | |
for i := 1; i <= 30; i++ { | |
day := fmt.Sprintf("%02d", i) | |
counts := GetInMemoryCounts(day) | |
fmt.Printf("%v,%v,%v,%v,%v\n", counts.day, counts.allDev, counts.uniqueDev, counts.allProd, counts.uniqueProd) | |
totalDev += counts.uniqueDev | |
totalProd += counts.uniqueProd | |
} | |
fmt.Printf("TOTAL,%v,,%v,\n", totalDev, totalProd) | |
} | |
func DumpDeDuped(day string) { | |
devSeen := map[string]nothing{} | |
prodSeen := map[string]nothing{} | |
os.Mkdir("out", 0777) | |
os.Remove("out/dev-" + day + ".tar.gz") | |
dev, err := os.Create("out/dev-" + day + ".txt.gz") | |
if err != nil { | |
panic(err) | |
} | |
defer dev.Close() | |
os.Remove("out/prod-" + day + ".tar.gz") | |
prod, err := os.Create("out/prod-" + day + ".txt.gz") | |
if err != nil { | |
panic(err) | |
} | |
defer prod.Close() | |
dz, err := gzip.NewWriterLevel(dev, gzip.BestCompression) | |
if err != nil { | |
panic(err) | |
} | |
defer dz.Close() | |
pz, err := gzip.NewWriterLevel(prod, gzip.BestCompression) | |
if err != nil { | |
panic(err) | |
} | |
defer pz.Close() | |
WithFiles(day, func(p string, isDev bool) { | |
WithLines(p, func(id string) { | |
if isDev { | |
if _, ok := devSeen[id]; !ok { | |
dz.Write([]byte(id + "\n")) | |
devSeen[id] = nothing{} | |
} | |
} else { | |
if _, ok := prodSeen[id]; !ok { | |
pz.Write([]byte(id + "\n")) | |
prodSeen[id] = nothing{} | |
} | |
} | |
}) | |
}) | |
} | |
func main() { | |
days := make(chan string) | |
go func() { | |
for i := 1; i <= 30; i++ { | |
day := fmt.Sprintf("%02d", i) | |
days <- day | |
} | |
close(days) | |
}() | |
for day := range days { | |
fmt.Println("Processing", day) | |
DumpDeDuped(day) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment