Last active
December 10, 2015 20:18
-
-
Save eikeon/4487192 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bufio" | |
"compress/gzip" | |
"encoding/binary" | |
"encoding/json" | |
"fmt" | |
"io" | |
"log" | |
"math/rand" | |
"os" | |
"path" | |
"runtime" | |
"sync" | |
"time" | |
"code.google.com/p/snappy-go/snappy" | |
"launchpad.net/goamz/aws" | |
"launchpad.net/goamz/s3" | |
) | |
type tweetItem []byte | |
func (i tweetItem) Key() int64 { | |
return int64(binary.LittleEndian.Uint64(i[0:8])) | |
} | |
func (i tweetItem) Value() []byte { | |
l := binary.LittleEndian.Uint32(i[8:12]) | |
return i[12 : 12+l] | |
} | |
func (i tweetItem) Bytes() []byte { | |
return i | |
} | |
func keyTweets(unkeyedTweets chan tweetItem, tweets chan tweetItem) { | |
for tw := range unkeyedTweets { | |
et, err := snappy.Encode(nil, tw) | |
if err != nil { | |
panic(err) | |
} | |
var tweet struct { | |
Created_At string | |
} | |
if err := json.Unmarshal(tw, &tweet); err == nil { | |
if t, err := time.Parse(time.RubyDate, tweet.Created_At); err == nil { | |
l := len(et) | |
out := make(tweetItem, 12+l) | |
key := t.UnixNano() | |
out[0] = byte(key) | |
out[1] = byte(key >> 8) | |
out[2] = byte(key >> 16) | |
out[3] = byte(key >> 24) | |
out[4] = byte(key >> 32) | |
out[5] = byte(key >> 40) | |
out[6] = byte(key >> 48) | |
out[7] = byte(key >> 56) | |
out[8] = byte(l) | |
out[9] = byte(l >> 8) | |
out[10] = byte(l >> 16) | |
out[11] = byte(l >> 24) | |
copy(out[12:12+l], et) | |
tweets <- out | |
} else { | |
log.Fatal(err) | |
} | |
} else { | |
log.Println("Could not unmarshal:", string(tw)) | |
panic("") | |
} | |
} | |
} | |
func NewTweetReader(reader *bufio.Reader) *TweetReader { | |
unkeyedTweets := make(chan tweetItem, 4096) | |
tweets := make(chan tweetItem, 4096) | |
go func() { | |
for { | |
if b, isPrefix, err := reader.ReadLine(); err == nil { | |
if isPrefix == true { | |
log.Fatal("line too long") | |
} | |
l := len(b) | |
tb := make(tweetItem, l) | |
copy(tb, b) | |
unkeyedTweets <- tb | |
} else { | |
break | |
} | |
} | |
close(unkeyedTweets) | |
}() | |
go func() { | |
wg := sync.WaitGroup{} | |
for i := runtime.NumCPU(); i > 0; i-- { | |
wg.Add(1) | |
go func() { | |
keyTweets(unkeyedTweets, tweets) | |
wg.Done() | |
}() | |
} | |
wg.Wait() | |
close(tweets) | |
}() | |
return &TweetReader{tweets} | |
} | |
type TweetReader struct { | |
tweets chan tweetItem | |
} | |
type Item interface { | |
//Less(b Item) bool | |
Bytes() []byte | |
} | |
func (tr *TweetReader) Read() (item Item) { | |
tw, ok := <-tr.tweets | |
if ok { | |
item = tw | |
} | |
return item | |
} | |
type TweetWriter struct { | |
name string | |
current string | |
out *os.File | |
writer *gzip.Writer | |
} | |
func (w *TweetWriter) tweetFile(tweet tweetItem) string { | |
t := time.Unix(0, tweet.Key()).In(time.UTC) | |
year := fmt.Sprintf("%04d", t.Year()) | |
month := fmt.Sprintf("%02d", t.Month()) | |
day := fmt.Sprintf("%02d", t.Day()) | |
hour := fmt.Sprintf("%02d", t.Hour()) | |
filename := w.name //filename := year + month + day + hour + "tweets.gz" | |
return path.Join(year, month, day, hour, filename) | |
} | |
func (w *TweetWriter) getWriter(tweet tweetItem) io.Writer { | |
p := w.tweetFile(tweet) | |
if p != w.current { | |
w.current = p | |
if w.out != nil { | |
w.writer.Close() | |
w.out.Close() | |
w.out = nil | |
w.writer = nil | |
} | |
f := path.Join("out", p) | |
err := os.MkdirAll(path.Dir(f), os.ModePerm) | |
if err != nil { | |
panic(err) | |
} | |
if out, err := os.Create(f); err == nil { | |
w.out = out | |
} else { | |
panic(err) | |
} | |
gw, err := gzip.NewWriterLevel(w.out, gzip.BestSpeed) | |
if err != nil { | |
panic(err) | |
} | |
w.writer = gw | |
} | |
return w.writer | |
} | |
func (w *TweetWriter) Write(item Item) { | |
ti := item.(tweetItem) | |
v := ti.Value() | |
// TODO: remove snappy encode decode? | |
t, err := snappy.Decode(nil, v) | |
if err != nil { | |
panic(err) | |
} | |
writer := w.getWriter(ti) | |
writer.Write(t) | |
writer.Write([]byte("\n")) | |
} | |
func main() { | |
runtime.GOMAXPROCS(runtime.NumCPU()) | |
auth, err := aws.EnvAuth() | |
if err != nil { | |
log.Fatal(err) | |
} | |
s := s3.New(auth, aws.USEast) | |
sorted := s.Bucket("lctwee-sorted") | |
sortedResponse, err := sorted.List("", "/", "", 1000) | |
if err != nil { | |
log.Fatal(err) | |
} | |
remaining := []string{} | |
for _, s := range sortedResponse.Contents { | |
remaining = append(remaining, s.Key) | |
} | |
log.Println(remaining) | |
for len(remaining) > 0 { | |
name := remaining[rand.Int()%len(remaining)] | |
log.Println("name:", name) | |
body, err := sorted.GetReader(name) | |
if err != nil { | |
log.Fatal(err) | |
} | |
rc, err := gzip.NewReader(body) | |
if err != nil { | |
log.Fatal(err) | |
} | |
br := bufio.NewReaderSize(rc, 1<<24) | |
in := NewTweetReader(br) | |
out := &TweetWriter{name: name} | |
for { | |
t := in.Read() | |
if t == nil { | |
break | |
} else { | |
out.Write(t) | |
} | |
} | |
out.writer.Close() | |
out.out.Close() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment