Skip to content

Instantly share code, notes, and snippets.

@eikeon
Created November 28, 2012 19:49
Show Gist options
  • Save eikeon/4163648 to your computer and use it in GitHub Desktop.
Save eikeon/4163648 to your computer and use it in GitHub Desktop.
package main
import (
"bufio"
"compress/gzip"
"flag"
"fmt"
"io"
"log"
"os"
"os/exec"
"path"
"launchpad.net/goamz/aws"
"launchpad.net/goamz/s3"
)
func download(b *s3.Bucket, name string) string {
log.Println("downloading:", name)
local_path := path.Join("/tmp", name)
f, err := os.OpenFile(local_path, os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
log.Fatal(err)
}
body, err := b.GetReader(name)
if err != nil {
log.Fatal(err)
}
_, err = io.Copy(f, body)
if err != nil {
log.Fatal(err)
}
body.Close()
return local_path
}
func upload(b *s3.Bucket, match string) error {
name := path.Base(match)
log.Println("uploading:", name)
fi, err := os.Stat(match)
if err!=nil {
log.Fatal(err)
}
f, err := os.Open(match)
if err!=nil {
log.Fatal(err)
}
reader := bufio.NewReader(f)
length := fi.Size()
err = b.PutReader(name, reader, length, "application/x-lzop", s3.Private)
return err
}
func main() {
const part_name = "part-00000"
start := flag.Int("start", 0, "")
end := flag.Int("end", 0, "")
flag.Parse()
auth, err := aws.EnvAuth()
if err != nil {
log.Fatal(err)
}
s := s3.New(auth, aws.USEast)
b := s.Bucket("eikeon-testing")
for number:= *start; number<=*end; number++ {
name := fmt.Sprintf("%s-%04d.lzo", part_name, number)
sorted_name := fmt.Sprintf("%s-%04d-sorted.gz", part_name, number)
local_path := download(b, name)
read_cmd := exec.Command("lzop", "-fdc", local_path)
rc, err := read_cmd.StdoutPipe()
if err != nil {
log.Println("could not connect to standard out")
panic(err)
}
if err = read_cmd.Start(); err != nil {
log.Println("error starting command")
panic(err)
}
sorted_path := path.Join("/tmp", sorted_name)
f, err := os.OpenFile(sorted_path, os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
log.Fatal(err)
}
wc, err := gzip.NewWriterLevel(f, gzip.BestCompression)
if err != nil {
log.Fatal(err)
}
sort(rc, wc)
rc.Close()
wc.Close()
if err := read_cmd.Wait(); err != nil {
log.Println("error waiting on read command")
log.Fatal(err)
}
upload(b, sorted_path)
err = os.Remove(local_path)
if err != nil {
log.Fatal(err)
}
err = os.Remove(sorted_path)
if err != nil {
log.Fatal(err)
}
}
}
package main
import (
"bufio"
"encoding/binary"
"encoding/json"
"io"
"log"
"runtime"
"sync"
"time"
"code.google.com/p/snappy-go/snappy"
"github.com/eikeon/funnelsort"
)
type tweetItem []byte
func (i tweetItem) Key() int64 {
return int64(binary.LittleEndian.Uint64(i[0:8]))
}
func (i tweetItem) Value() []byte {
l := binary.LittleEndian.Uint32(i[8:12])
return i[12 : 12+l]
}
func (i tweetItem) Less(b funnelsort.Item) bool {
return i.Key() < b.(tweetItem).Key()
}
func (i tweetItem) Bytes() []byte {
return i
}
func NewItem(b []byte) funnelsort.Item {
l := binary.LittleEndian.Uint32(b[8:12])
return tweetItem(b[0 : 12+l])
}
func keyTweets(unkeyedTweets chan tweetItem, tweets chan tweetItem) {
for tw := range unkeyedTweets {
et, err := snappy.Encode(nil, tw)
if err != nil {
panic(err)
}
var tweet struct {
Created_At string
}
if err := json.Unmarshal(tw, &tweet); err == nil {
if t, err := time.Parse(time.RubyDate, tweet.Created_At); err == nil {
l := len(et)
out := make(tweetItem, 12+l)
key := t.UnixNano()
out[0] = byte(key)
out[1] = byte(key >> 8)
out[2] = byte(key >> 16)
out[3] = byte(key >> 24)
out[4] = byte(key >> 32)
out[5] = byte(key >> 40)
out[6] = byte(key >> 48)
out[7] = byte(key >> 56)
out[8] = byte(l)
out[9] = byte(l >> 8)
out[10] = byte(l >> 16)
out[11] = byte(l >> 24)
copy(out[12:12+l], et)
tweets <- out
} else {
log.Fatal(err)
}
} else {
log.Println("Could not unmarshal:", string(tw))
panic("")
}
}
}
func NewTweetReader(reader *bufio.Reader) *TweetReader {
unkeyedTweets := make(chan tweetItem, 4096)
tweets := make(chan tweetItem, 4096)
go func() {
for {
if b, isPrefix, err := reader.ReadLine(); err == nil {
if isPrefix == true {
log.Fatal("line too long")
}
l := len(b)
tb := make(tweetItem, l)
copy(tb, b)
unkeyedTweets <- tb
} else {
break
}
}
close(unkeyedTweets)
}()
go func() {
wg := sync.WaitGroup{}
for i := runtime.NumCPU(); i > 0; i-- {
wg.Add(1)
go func() {
keyTweets(unkeyedTweets, tweets)
wg.Done()
}()
}
wg.Wait()
close(tweets)
}()
return &TweetReader{tweets}
}
type TweetReader struct {
tweets chan tweetItem
}
func (tr *TweetReader) Read() (item funnelsort.Item) {
tw, ok := <-tr.tweets
if ok {
item = tw
}
return item
}
type TweetWriter struct {
writer io.Writer
}
func (w *TweetWriter) Write(item funnelsort.Item) {
ti := item.(tweetItem)
v := ti.Value()
t, err := snappy.Decode(nil, v)
if err != nil {
panic(err)
}
w.writer.Write(t)
w.writer.Write([]byte("\n"))
}
func sort(r io.Reader, w io.Writer) {
runtime.GOMAXPROCS(runtime.NumCPU())
funnelsort.NewItem = NewItem
br := bufio.NewReaderSize(r, 1<<24)
bw := bufio.NewWriterSize(w, 1<<24)
in := NewTweetReader(br)
funnelsort.FunnelSort(in, &TweetWriter{bw})
bw.Flush()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment