Skip to content

Instantly share code, notes, and snippets.

@eikeon
Created November 30, 2012 01:55
Show Gist options
  • Save eikeon/4173267 to your computer and use it in GitHub Desktop.
Save eikeon/4173267 to your computer and use it in GitHub Desktop.
package main
import (
"bufio"
"compress/gzip"
"fmt"
"log"
"runtime"
"time"
"launchpad.net/goamz/aws"
"launchpad.net/goamz/s3"
)
type reader struct {
b *s3.Bucket
path string
out chan *tweetItem
}
func NewReader(b *s3.Bucket, path string) *reader {
r := reader{b, path, nil}
r.out = make(chan *tweetItem)
go r.read()
return &r
}
func (r *reader) Name() string {
return fmt.Sprintf("{reader: %s}", r.path)
}
func (r *reader) Out() chan *tweetItem {
return r.out
}
func (r *reader) read() {
body, err := r.b.GetReader(r.path)
if err != nil {
log.Fatal(err)
}
gr, err := gzip.NewReader(body)
if err != nil {
log.Fatal(err)
}
reader := bufio.NewReaderSize(gr, 16*4096)
for {
if b, isPrefix, err := reader.ReadLine(); err == nil {
if isPrefix == true {
log.Fatal("line too long")
}
l := len(b)
tb := make(tweetItem, l)
copy(tb, b)
t, err := keyTweet(tb)
if err == nil {
if &t == nil {
panic("bla")
}
r.out <- &t
} else {
log.Printf("key tweet while reading: %s err: %v\n", r.Name(), err)
}
} else {
log.Printf("err reading line: %v\n", err)
break
}
}
err = gr.Close()
if err != nil {
panic(err)
}
err = body.Close()
if err != nil {
panic(err)
}
close(r.out)
}
func merge(channels []producer) producer {
for n := len(channels); n > 1; n = len(channels) {
var out_channels []producer
for i := 0; i < n/2; i++ {
var nc producer
if 2*i+1 < n {
nc = NewBinaryMerger((channels)[2*i], (channels)[2*i+1])
} else {
nc = (channels)[2*i]
}
out_channels = append(out_channels, nc)
}
channels = out_channels[:]
}
return channels[0]
}
type producer interface {
Name() string
Out() chan *tweetItem
}
type binaryMerger struct {
a, b producer
out chan *tweetItem
}
func NewBinaryMerger(a, b producer) *binaryMerger {
bm := binaryMerger{a, b, nil}
bm.out = make(chan *tweetItem)
go bm.merge()
return &bm
}
func (bm *binaryMerger) Name() string {
return fmt.Sprintf("{a: %s, b: %s}", bm.a.Name(), bm.b.Name())
}
func (bm *binaryMerger) Out() chan *tweetItem {
return bm.out
}
func (bm *binaryMerger) merge() {
next_a := <-bm.a.Out()
next_b := <-bm.b.Out()
for {
var next *tweetItem
if next_a != nil && next_b != nil {
if next_a.Key() < next_b.Key() {
next = next_a
next_a = <-bm.a.Out()
} else {
next = next_b
next_b = <-bm.b.Out()
}
} else if next_a != nil {
next = next_a
next_a = <-bm.a.Out()
} else if next_b != nil {
next = next_b
next_b = <-bm.b.Out()
} else {
break
}
bm.out <- next
}
close(bm.out)
}
func main() {
runtime.GOMAXPROCS(1)
auth, err := aws.EnvAuth()
if err != nil {
log.Fatal(err)
}
s := s3.New(auth, aws.USEast)
b := s.Bucket("lctwee-sorted")
merged := s.Bucket("twitter-merged")
err = merged.PutBucket(s3.Private)
if err != nil {
log.Println(err)
}
start := time.Date(2010, time.December, 31, 12, 0, 0, 0, time.UTC)
end := time.Date(2011, time.January, 1, 12, 0, 0, 0, time.UTC)
for current := start; current.Before(end); current = current.AddDate(0, 0, 1) {
for h := 0; h < 24; h++ {
hour := current.Format("2006/01/02") + fmt.Sprintf("/%02d/", h)
log.Println("merging:", hour)
name := hour + "tweet.gz"
isMerged := false
for {
body, err := merged.GetReader(name)
if err != nil {
if err.(*s3.Error).Code == "NoSuchKey" {
break
}
log.Printf("Status Code: %d Code: %s\n", err.(*s3.Error).StatusCode, err.(*s3.Error).Code)
log.Printf("error getting reader for %s: %s", name, err)
time.Sleep(10 * time.Second)
} else {
body.Close()
isMerged = true
break
}
}
log.Println(name, " is merged", isMerged)
response, err := b.List(hour, "/", "", 1000)
if err != nil {
log.Fatal(err)
}
var cs []producer
for _, u := range response.Contents {
cs = append(cs, NewReader(b, u.Key))
}
m := merge(cs)
out := NewTweetWriter(name, merged)
for i := range m.Out() {
out.Write(i)
}
out.Close()
}
}
}
package main
import (
"encoding/binary"
"encoding/json"
"io"
"log"
"time"
"code.google.com/p/snappy-go/snappy"
)
type tweetItem []byte
func (i tweetItem) Key() int64 {
return int64(binary.LittleEndian.Uint64(i[0:8]))
}
func (i tweetItem) Value() []byte {
l := binary.LittleEndian.Uint32(i[8:12])
return i[12 : 12+l]
}
func (i tweetItem) Bytes() []byte {
return i
}
func (i tweetItem) Write(writer io.Writer) {
v := i.Value()
t, err := snappy.Decode(nil, v)
if err != nil {
panic(err)
}
writer.Write(t)
writer.Write([]byte("\n"))
}
func keyTweet(tw tweetItem) (rt tweetItem, err error) {
et, err := snappy.Encode(nil, tw)
if err != nil {
return nil, err
}
var tweet struct {
Created_At string
}
if err := json.Unmarshal(tw, &tweet); err == nil {
if t, err := time.Parse(time.RubyDate, tweet.Created_At); err == nil {
l := len(et)
out := make(tweetItem, 12+l)
key := t.UnixNano()
out[0] = byte(key)
out[1] = byte(key >> 8)
out[2] = byte(key >> 16)
out[3] = byte(key >> 24)
out[4] = byte(key >> 32)
out[5] = byte(key >> 40)
out[6] = byte(key >> 48)
out[7] = byte(key >> 56)
out[8] = byte(l)
out[9] = byte(l >> 8)
out[10] = byte(l >> 16)
out[11] = byte(l >> 24)
copy(out[12:12+l], et)
rt = out
} else {
return nil, err
}
} else {
log.Println("Could not unmarshal:", string(tw))
return nil, err
}
return
}
package main
import (
"bytes"
"compress/gzip"
"io"
"log"
"time"
"launchpad.net/goamz/s3"
)
type TweetWriter struct {
bucket *s3.Bucket
name string
current string
out *bytes.Buffer
}
func NewTweetWriter(name string, bucket *s3.Bucket) *TweetWriter {
tw := &TweetWriter{name: name, bucket: bucket}
tw.out = new(bytes.Buffer)
return tw
}
func (w *TweetWriter) Write(item *tweetItem) {
item.Write(w.out)
}
func (w *TweetWriter) Close() {
out := new(bytes.Buffer)
gw, err := gzip.NewWriterLevel(out, gzip.BestSpeed)
if err != nil {
panic(err)
}
_, err = io.Copy(gw, bytes.NewReader(w.out.Bytes()))
if err != nil {
log.Fatal("error copying to gzip writer", err)
}
gw.Close()
data := out.Bytes()
for {
err := w.bucket.PutReader(w.name, bytes.NewReader(data), int64(len(data)), "application/x-gzip", s3.Private)
if err == nil {
break
} else {
log.Println("error writing to s3:", err)
}
time.Sleep(1 * time.Second)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment