Created
November 30, 2012 01:55
-
-
Save eikeon/4173267 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bufio" | |
"compress/gzip" | |
"fmt" | |
"log" | |
"runtime" | |
"time" | |
"launchpad.net/goamz/aws" | |
"launchpad.net/goamz/s3" | |
) | |
type reader struct { | |
b *s3.Bucket | |
path string | |
out chan *tweetItem | |
} | |
func NewReader(b *s3.Bucket, path string) *reader { | |
r := reader{b, path, nil} | |
r.out = make(chan *tweetItem) | |
go r.read() | |
return &r | |
} | |
func (r *reader) Name() string { | |
return fmt.Sprintf("{reader: %s}", r.path) | |
} | |
func (r *reader) Out() chan *tweetItem { | |
return r.out | |
} | |
func (r *reader) read() { | |
body, err := r.b.GetReader(r.path) | |
if err != nil { | |
log.Fatal(err) | |
} | |
gr, err := gzip.NewReader(body) | |
if err != nil { | |
log.Fatal(err) | |
} | |
reader := bufio.NewReaderSize(gr, 16*4096) | |
for { | |
if b, isPrefix, err := reader.ReadLine(); err == nil { | |
if isPrefix == true { | |
log.Fatal("line too long") | |
} | |
l := len(b) | |
tb := make(tweetItem, l) | |
copy(tb, b) | |
t, err := keyTweet(tb) | |
if err == nil { | |
if &t == nil { | |
panic("bla") | |
} | |
r.out <- &t | |
} else { | |
log.Printf("key tweet while reading: %s err: %v\n", r.Name(), err) | |
} | |
} else { | |
log.Printf("err reading line: %v\n", err) | |
break | |
} | |
} | |
err = gr.Close() | |
if err != nil { | |
panic(err) | |
} | |
err = body.Close() | |
if err != nil { | |
panic(err) | |
} | |
close(r.out) | |
} | |
func merge(channels []producer) producer { | |
for n := len(channels); n > 1; n = len(channels) { | |
var out_channels []producer | |
for i := 0; i < n/2; i++ { | |
var nc producer | |
if 2*i+1 < n { | |
nc = NewBinaryMerger((channels)[2*i], (channels)[2*i+1]) | |
} else { | |
nc = (channels)[2*i] | |
} | |
out_channels = append(out_channels, nc) | |
} | |
channels = out_channels[:] | |
} | |
return channels[0] | |
} | |
type producer interface { | |
Name() string | |
Out() chan *tweetItem | |
} | |
type binaryMerger struct { | |
a, b producer | |
out chan *tweetItem | |
} | |
func NewBinaryMerger(a, b producer) *binaryMerger { | |
bm := binaryMerger{a, b, nil} | |
bm.out = make(chan *tweetItem) | |
go bm.merge() | |
return &bm | |
} | |
func (bm *binaryMerger) Name() string { | |
return fmt.Sprintf("{a: %s, b: %s}", bm.a.Name(), bm.b.Name()) | |
} | |
func (bm *binaryMerger) Out() chan *tweetItem { | |
return bm.out | |
} | |
func (bm *binaryMerger) merge() { | |
next_a := <-bm.a.Out() | |
next_b := <-bm.b.Out() | |
for { | |
var next *tweetItem | |
if next_a != nil && next_b != nil { | |
if next_a.Key() < next_b.Key() { | |
next = next_a | |
next_a = <-bm.a.Out() | |
} else { | |
next = next_b | |
next_b = <-bm.b.Out() | |
} | |
} else if next_a != nil { | |
next = next_a | |
next_a = <-bm.a.Out() | |
} else if next_b != nil { | |
next = next_b | |
next_b = <-bm.b.Out() | |
} else { | |
break | |
} | |
bm.out <- next | |
} | |
close(bm.out) | |
} | |
func main() { | |
runtime.GOMAXPROCS(1) | |
auth, err := aws.EnvAuth() | |
if err != nil { | |
log.Fatal(err) | |
} | |
s := s3.New(auth, aws.USEast) | |
b := s.Bucket("lctwee-sorted") | |
merged := s.Bucket("twitter-merged") | |
err = merged.PutBucket(s3.Private) | |
if err != nil { | |
log.Println(err) | |
} | |
start := time.Date(2010, time.December, 31, 12, 0, 0, 0, time.UTC) | |
end := time.Date(2011, time.January, 1, 12, 0, 0, 0, time.UTC) | |
for current := start; current.Before(end); current = current.AddDate(0, 0, 1) { | |
for h := 0; h < 24; h++ { | |
hour := current.Format("2006/01/02") + fmt.Sprintf("/%02d/", h) | |
log.Println("merging:", hour) | |
name := hour + "tweet.gz" | |
isMerged := false | |
for { | |
body, err := merged.GetReader(name) | |
if err != nil { | |
if err.(*s3.Error).Code == "NoSuchKey" { | |
break | |
} | |
log.Printf("Status Code: %d Code: %s\n", err.(*s3.Error).StatusCode, err.(*s3.Error).Code) | |
log.Printf("error getting reader for %s: %s", name, err) | |
time.Sleep(10 * time.Second) | |
} else { | |
body.Close() | |
isMerged = true | |
break | |
} | |
} | |
log.Println(name, " is merged", isMerged) | |
response, err := b.List(hour, "/", "", 1000) | |
if err != nil { | |
log.Fatal(err) | |
} | |
var cs []producer | |
for _, u := range response.Contents { | |
cs = append(cs, NewReader(b, u.Key)) | |
} | |
m := merge(cs) | |
out := NewTweetWriter(name, merged) | |
for i := range m.Out() { | |
out.Write(i) | |
} | |
out.Close() | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"encoding/binary" | |
"encoding/json" | |
"io" | |
"log" | |
"time" | |
"code.google.com/p/snappy-go/snappy" | |
) | |
type tweetItem []byte | |
func (i tweetItem) Key() int64 { | |
return int64(binary.LittleEndian.Uint64(i[0:8])) | |
} | |
func (i tweetItem) Value() []byte { | |
l := binary.LittleEndian.Uint32(i[8:12]) | |
return i[12 : 12+l] | |
} | |
func (i tweetItem) Bytes() []byte { | |
return i | |
} | |
func (i tweetItem) Write(writer io.Writer) { | |
v := i.Value() | |
t, err := snappy.Decode(nil, v) | |
if err != nil { | |
panic(err) | |
} | |
writer.Write(t) | |
writer.Write([]byte("\n")) | |
} | |
func keyTweet(tw tweetItem) (rt tweetItem, err error) { | |
et, err := snappy.Encode(nil, tw) | |
if err != nil { | |
return nil, err | |
} | |
var tweet struct { | |
Created_At string | |
} | |
if err := json.Unmarshal(tw, &tweet); err == nil { | |
if t, err := time.Parse(time.RubyDate, tweet.Created_At); err == nil { | |
l := len(et) | |
out := make(tweetItem, 12+l) | |
key := t.UnixNano() | |
out[0] = byte(key) | |
out[1] = byte(key >> 8) | |
out[2] = byte(key >> 16) | |
out[3] = byte(key >> 24) | |
out[4] = byte(key >> 32) | |
out[5] = byte(key >> 40) | |
out[6] = byte(key >> 48) | |
out[7] = byte(key >> 56) | |
out[8] = byte(l) | |
out[9] = byte(l >> 8) | |
out[10] = byte(l >> 16) | |
out[11] = byte(l >> 24) | |
copy(out[12:12+l], et) | |
rt = out | |
} else { | |
return nil, err | |
} | |
} else { | |
log.Println("Could not unmarshal:", string(tw)) | |
return nil, err | |
} | |
return | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bytes" | |
"compress/gzip" | |
"io" | |
"log" | |
"time" | |
"launchpad.net/goamz/s3" | |
) | |
type TweetWriter struct { | |
bucket *s3.Bucket | |
name string | |
current string | |
out *bytes.Buffer | |
} | |
func NewTweetWriter(name string, bucket *s3.Bucket) *TweetWriter { | |
tw := &TweetWriter{name: name, bucket: bucket} | |
tw.out = new(bytes.Buffer) | |
return tw | |
} | |
func (w *TweetWriter) Write(item *tweetItem) { | |
item.Write(w.out) | |
} | |
func (w *TweetWriter) Close() { | |
out := new(bytes.Buffer) | |
gw, err := gzip.NewWriterLevel(out, gzip.BestSpeed) | |
if err != nil { | |
panic(err) | |
} | |
_, err = io.Copy(gw, bytes.NewReader(w.out.Bytes())) | |
if err != nil { | |
log.Fatal("error copying to gzip writer", err) | |
} | |
gw.Close() | |
data := out.Bytes() | |
for { | |
err := w.bucket.PutReader(w.name, bytes.NewReader(data), int64(len(data)), "application/x-gzip", s3.Private) | |
if err == nil { | |
break | |
} else { | |
log.Println("error writing to s3:", err) | |
} | |
time.Sleep(1 * time.Second) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment