Skip to content

Instantly share code, notes, and snippets.

@calebdoxsey
Created January 20, 2014 20:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save calebdoxsey/8528286 to your computer and use it in GitHub Desktop.
Save calebdoxsey/8528286 to your computer and use it in GitHub Desktop.
download usage data
package main
import (
"fmt"
"github.com/mitchellh/goamz/aws"
"github.com/mitchellh/goamz/s3"
"io"
"log"
"os"
"strings"
"sync"
)
var (
s3Prefix = "audit/twitter/prod/twitter/track/2013/11"
auth *aws.Auth
client *s3.S3
bucket *s3.Bucket
)
const (
WORKERS = 200
)
func init() {
auth, err := aws.GetAuth("ACCESS_KEY_ID", "SECRET_ACCESS_KEY")
if err != nil {
panic(err)
}
client = s3.New(auth, aws.USEast)
bucket = client.Bucket("archive.hosey")
}
func keyProducer(dayHour string, keyChannel chan string) error {
marker := ""
for {
resp, err := bucket.List(s3Prefix+"/"+dayHour, "", marker, 1000)
if err != nil {
return err
}
for _, key := range resp.Contents {
if strings.Contains(key.Key, "netbase") {
keyChannel <- key.Key
}
}
if !resp.IsTruncated {
break
}
marker = resp.Contents[len(resp.Contents)-1].Key
}
return nil
}
func keyConsumer(keyChannel chan string) {
for key := range keyChannel {
downloadFile(key)
}
}
func dayHourProducer(dayHourChannel chan string) {
for i := 1; i <= 30; i++ {
for j := 0; j < 24; j++ {
dayHour := fmt.Sprintf("%02d/%02d", i, j)
dayHourChannel <- dayHour
}
}
close(dayHourChannel)
}
func dayHourConsumer(dayHourChannel, keyChannel chan string) {
for dayHour := range dayHourChannel {
log.Println("Getting files for", dayHour)
keyProducer(dayHour, keyChannel)
}
}
func downloadFile(key string) error {
filePath := "data/" + strings.Replace(key, "/", "_", -1)
if _, err := os.Stat(filePath); err == nil {
return nil
}
log.Println("Downloading", key)
file, err := os.Create(filePath)
if err != nil {
return err
}
defer file.Close()
reader, err := bucket.GetReader(key)
if err != nil {
return err
}
defer reader.Close()
_, err = io.Copy(file, reader)
return err
}
func main() {
os.Mkdir("data", 0777)
dayHourChannel := make(chan string)
keyChannel := make(chan string)
go dayHourProducer(dayHourChannel)
var wg sync.WaitGroup
for i := 0; i < WORKERS; i++ {
wg.Add(1)
go func() {
dayHourConsumer(dayHourChannel, keyChannel)
wg.Done()
}()
}
go func() {
wg.Wait()
close(keyChannel)
}()
var wg2 sync.WaitGroup
for i := 0; i < WORKERS; i++ {
wg2.Add(1)
go func() {
keyConsumer(keyChannel)
wg.Done()
}()
}
wg.Wait()
}
package main
import (
"bufio"
"compress/gzip"
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"github.com/jmhodges/levigo"
)
type (
nothing struct{}
Counts struct {
day string
allDev, uniqueDev, allProd, uniqueProd int
}
)
func WithFiles(day string, f func(string, bool)) {
filepath.Walk("data", func(p string, info os.FileInfo, err error) error {
if info.IsDir() {
return nil
}
if strings.Contains(p, "2013_11_"+day) {
fmt.Println("Processing", p)
f(p, strings.Contains(p, "--dev"))
}
return nil
})
}
func WithLines(p string, callback func(string)) {
f, err := os.Open(p)
if err != nil {
panic(err)
}
defer f.Close()
rdr := bufio.NewScanner(f)
for rdr.Scan() {
i := strings.IndexByte(rdr.Text(), ',')
if i > 0 {
callback(rdr.Text()[:i])
}
}
}
func GetDiskCounts(day string) (int, int) {
opts := levigo.NewOptions()
opts.SetCreateIfMissing(true)
wo := levigo.NewWriteOptions()
ro := levigo.NewReadOptions()
devSize := 0
dev, err := levigo.Open("kv-dev", opts)
if err != nil {
panic(err)
}
defer dev.Close()
defer os.RemoveAll("kv-dev")
prodSize := 0
prod, err := levigo.Open("kv-prod", opts)
if err != nil {
panic(err)
}
defer prod.Close()
defer os.RemoveAll("kv-prod")
WithFiles(day, func(p string, isDev bool) {
WithLines(p, func(id string) {
key := []byte(id)
if isDev {
v, err := dev.Get(ro, key)
if err != nil {
panic(err)
}
if v == nil {
dev.Put(wo, key, []byte{})
devSize++
if devSize%100000 == 0 {
fmt.Print(".")
}
}
} else {
v, err := prod.Get(ro, key)
if err != nil {
panic(err)
}
if v == nil {
prod.Put(wo, key, []byte{})
prodSize++
if prodSize%100000 == 0 {
fmt.Print(".")
}
}
}
})
})
return devSize, prodSize
}
func GetInMemoryCounts(day string) Counts {
counts := Counts{day: day}
dev := map[int64]nothing{}
prod := map[int64]nothing{}
WithFiles(day, func(p string, isDev bool) {
WithLines(p, func(id string) {
i, _ := strconv.ParseInt(id, 10, 64)
if isDev {
counts.allDev++
dev[i] = nothing{}
} else {
counts.allProd++
prod[i] = nothing{}
}
})
})
counts.uniqueDev = len(dev)
counts.uniqueProd = len(prod)
return counts
}
func GetCounts() {
totalDev := 0
totalProd := 0
fmt.Println("day,all dev,unique dev,all prod,unique prod")
for i := 1; i <= 30; i++ {
day := fmt.Sprintf("%02d", i)
counts := GetInMemoryCounts(day)
fmt.Printf("%v,%v,%v,%v,%v\n", counts.day, counts.allDev, counts.uniqueDev, counts.allProd, counts.uniqueProd)
totalDev += counts.uniqueDev
totalProd += counts.uniqueProd
}
fmt.Printf("TOTAL,%v,,%v,\n", totalDev, totalProd)
}
func DumpDeDuped(day string) {
devSeen := map[string]nothing{}
prodSeen := map[string]nothing{}
os.Mkdir("out", 0777)
os.Remove("out/dev-" + day + ".tar.gz")
dev, err := os.Create("out/dev-" + day + ".txt.gz")
if err != nil {
panic(err)
}
defer dev.Close()
os.Remove("out/prod-" + day + ".tar.gz")
prod, err := os.Create("out/prod-" + day + ".txt.gz")
if err != nil {
panic(err)
}
defer prod.Close()
dz, err := gzip.NewWriterLevel(dev, gzip.BestCompression)
if err != nil {
panic(err)
}
defer dz.Close()
pz, err := gzip.NewWriterLevel(prod, gzip.BestCompression)
if err != nil {
panic(err)
}
defer pz.Close()
WithFiles(day, func(p string, isDev bool) {
WithLines(p, func(id string) {
if isDev {
if _, ok := devSeen[id]; !ok {
dz.Write([]byte(id + "\n"))
devSeen[id] = nothing{}
}
} else {
if _, ok := prodSeen[id]; !ok {
pz.Write([]byte(id + "\n"))
prodSeen[id] = nothing{}
}
}
})
})
}
func main() {
days := make(chan string)
go func() {
for i := 1; i <= 30; i++ {
day := fmt.Sprintf("%02d", i)
days <- day
}
close(days)
}()
for day := range days {
fmt.Println("Processing", day)
DumpDeDuped(day)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment