Skip to content

Instantly share code, notes, and snippets.

@eikeon
Last active December 10, 2015 20:18
Show Gist options
  • Save eikeon/4487192 to your computer and use it in GitHub Desktop.
Save eikeon/4487192 to your computer and use it in GitHub Desktop.
package main
import (
"bufio"
"compress/gzip"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"log"
"math/rand"
"os"
"path"
"runtime"
"sync"
"time"
"code.google.com/p/snappy-go/snappy"
"launchpad.net/goamz/aws"
"launchpad.net/goamz/s3"
)
type tweetItem []byte
func (i tweetItem) Key() int64 {
return int64(binary.LittleEndian.Uint64(i[0:8]))
}
func (i tweetItem) Value() []byte {
l := binary.LittleEndian.Uint32(i[8:12])
return i[12 : 12+l]
}
func (i tweetItem) Bytes() []byte {
return i
}
func keyTweets(unkeyedTweets chan tweetItem, tweets chan tweetItem) {
for tw := range unkeyedTweets {
et, err := snappy.Encode(nil, tw)
if err != nil {
panic(err)
}
var tweet struct {
Created_At string
}
if err := json.Unmarshal(tw, &tweet); err == nil {
if t, err := time.Parse(time.RubyDate, tweet.Created_At); err == nil {
l := len(et)
out := make(tweetItem, 12+l)
key := t.UnixNano()
out[0] = byte(key)
out[1] = byte(key >> 8)
out[2] = byte(key >> 16)
out[3] = byte(key >> 24)
out[4] = byte(key >> 32)
out[5] = byte(key >> 40)
out[6] = byte(key >> 48)
out[7] = byte(key >> 56)
out[8] = byte(l)
out[9] = byte(l >> 8)
out[10] = byte(l >> 16)
out[11] = byte(l >> 24)
copy(out[12:12+l], et)
tweets <- out
} else {
log.Fatal(err)
}
} else {
log.Println("Could not unmarshal:", string(tw))
panic("")
}
}
}
func NewTweetReader(reader *bufio.Reader) *TweetReader {
unkeyedTweets := make(chan tweetItem, 4096)
tweets := make(chan tweetItem, 4096)
go func() {
for {
if b, isPrefix, err := reader.ReadLine(); err == nil {
if isPrefix == true {
log.Fatal("line too long")
}
l := len(b)
tb := make(tweetItem, l)
copy(tb, b)
unkeyedTweets <- tb
} else {
break
}
}
close(unkeyedTweets)
}()
go func() {
wg := sync.WaitGroup{}
for i := runtime.NumCPU(); i > 0; i-- {
wg.Add(1)
go func() {
keyTweets(unkeyedTweets, tweets)
wg.Done()
}()
}
wg.Wait()
close(tweets)
}()
return &TweetReader{tweets}
}
type TweetReader struct {
tweets chan tweetItem
}
type Item interface {
//Less(b Item) bool
Bytes() []byte
}
func (tr *TweetReader) Read() (item Item) {
tw, ok := <-tr.tweets
if ok {
item = tw
}
return item
}
type TweetWriter struct {
name string
current string
out *os.File
writer *gzip.Writer
}
func (w *TweetWriter) tweetFile(tweet tweetItem) string {
t := time.Unix(0, tweet.Key()).In(time.UTC)
year := fmt.Sprintf("%04d", t.Year())
month := fmt.Sprintf("%02d", t.Month())
day := fmt.Sprintf("%02d", t.Day())
hour := fmt.Sprintf("%02d", t.Hour())
filename := w.name //filename := year + month + day + hour + "tweets.gz"
return path.Join(year, month, day, hour, filename)
}
func (w *TweetWriter) getWriter(tweet tweetItem) io.Writer {
p := w.tweetFile(tweet)
if p != w.current {
w.current = p
if w.out != nil {
w.writer.Close()
w.out.Close()
w.out = nil
w.writer = nil
}
f := path.Join("out", p)
err := os.MkdirAll(path.Dir(f), os.ModePerm)
if err != nil {
panic(err)
}
if out, err := os.Create(f); err == nil {
w.out = out
} else {
panic(err)
}
gw, err := gzip.NewWriterLevel(w.out, gzip.BestSpeed)
if err != nil {
panic(err)
}
w.writer = gw
}
return w.writer
}
func (w *TweetWriter) Write(item Item) {
ti := item.(tweetItem)
v := ti.Value()
// TODO: remove snappy encode decode?
t, err := snappy.Decode(nil, v)
if err != nil {
panic(err)
}
writer := w.getWriter(ti)
writer.Write(t)
writer.Write([]byte("\n"))
}
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
auth, err := aws.EnvAuth()
if err != nil {
log.Fatal(err)
}
s := s3.New(auth, aws.USEast)
sorted := s.Bucket("lctwee-sorted")
sortedResponse, err := sorted.List("", "/", "", 1000)
if err != nil {
log.Fatal(err)
}
remaining := []string{}
for _, s := range sortedResponse.Contents {
remaining = append(remaining, s.Key)
}
log.Println(remaining)
for len(remaining) > 0 {
name := remaining[rand.Int()%len(remaining)]
log.Println("name:", name)
body, err := sorted.GetReader(name)
if err != nil {
log.Fatal(err)
}
rc, err := gzip.NewReader(body)
if err != nil {
log.Fatal(err)
}
br := bufio.NewReaderSize(rc, 1<<24)
in := NewTweetReader(br)
out := &TweetWriter{name: name}
for {
t := in.Read()
if t == nil {
break
} else {
out.Write(t)
}
}
out.writer.Close()
out.out.Close()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment