Skip to content

Instantly share code, notes, and snippets.

@eikeon
Created January 22, 2013 19:45
Show Gist options
  • Save eikeon/4597778 to your computer and use it in GitHub Desktop.
Save eikeon/4597778 to your computer and use it in GitHub Desktop.
package main
import (
"bufio"
"io"
"flag"
"log"
"fmt"
"os"
"os/exec"
"path"
"time"
"github.com/stathat/go"
"launchpad.net/goamz/aws"
"launchpad.net/goamz/s3"
"launchpad.net/~prudhvikrishna/goamz/sqs"
)
func download(b *s3.Bucket, name string) string {
log.Println("downloading:", name)
local_path := path.Join("/mnt", name)
f, err := os.OpenFile(local_path, os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
log.Fatal(err)
}
body, err := b.GetReader(name)
if err != nil {
log.Fatal(err)
}
_, err = io.Copy(f, body)
if err != nil {
log.Fatal(err)
}
body.Close()
return local_path
}
func main() {
createQ := flag.Bool("createQ", false, "create and populate the queue")
flag.Parse()
auth, err := aws.EnvAuth()
if err != nil {
log.Fatal(err)
}
s := s3.New(auth, aws.USEast)
ssqs := sqs.New(auth, aws.USEast)
unsorted := s.Bucket("twitter-unsorted")
queue_name := "twitter-unsorted-count-q"
if *createQ {
timeOutAttribute := sqs.Attribute{"VisibilityTimeout", "3600"}
maxMessageSizeAttribute := sqs.Attribute{"MaximumMessageSize", "65536"}
messageRetentionAttribute := sqs.Attribute{"MessageRetentionPeriod", "345600"}
q, err := ssqs.CreateQueue(queue_name, []sqs.Attribute{timeOutAttribute, maxMessageSizeAttribute, messageRetentionAttribute})
if err != nil {
log.Fatal("failed to get queue:", err)
panic(err)
}
retry:
unsortedResponse, err := unsorted.List("", "/", "", 1000)
if err != nil {
log.Println("error listing unsorted:", err)
time.Sleep(10 * time.Second)
goto retry
}
for _, u := range unsortedResponse.Contents {
q.SendMessage(u.Key)
}
}
unsortedCount := s.Bucket("twitter-unsorted-count")
err = unsortedCount.PutBucket(s3.Private)
if err != nil {
log.Println(err)
}
unsortedLast := s.Bucket("twitter-unsorted-last")
err = unsortedLast.PutBucket(s3.Private)
if err != nil {
log.Println(err)
}
q, err := ssqs.GetQueue(queue_name)
if err != nil {
log.Fatal("failed to get queue:", err)
panic(err)
}
for {
resp, err := q.ReceiveMessage([]string{"All"}, 1, 3600)
if err != nil {
log.Fatal(err)
}
if len(resp.Messages) == 0 {
break
}
for _, message := range resp.Messages {
name := message.Body
local_path := download(unsorted, name)
read_cmd := exec.Command("lzop", "-fdc", local_path)
rc, err := read_cmd.StdoutPipe()
if err != nil {
log.Println("could not connect to standard out")
panic(err)
}
if err = read_cmd.Start(); err != nil {
log.Println("error starting command")
panic(err)
}
count := 0
reader := bufio.NewReaderSize(rc, 1<<24)
var last []byte
for {
if b, isPrefix, err := reader.ReadLine(); err == nil {
if isPrefix == true {
log.Fatal("line too long")
}
count++
last = b
} else if err == io.EOF {
break
} else {
log.Fatal("Error reading line:", err)
}
}
rc.Close()
retryCount:
err = unsortedCount.Put(name, []byte(fmt.Sprintf("%d", count)), "text/string", s3.Private)
if err != nil {
log.Println("error putting count:", err)
time.Sleep(time.Second)
goto retryCount
}
retryLast:
err = unsortedLast.Put(name, last, "text/string", s3.Private)
if err!= nil {
log.Println("error putting last:", err)
time.Sleep(time.Second)
goto retryLast
}
if err := stathat.PostEZCount("unsorted count", "eikeon@eikeon.com", 1); err != nil {
log.Printf("error posting unsorted count: %v", err)
}
if err := read_cmd.Wait(); err != nil {
log.Println("error waiting on read command")
log.Fatal(err)
}
err = os.Remove(local_path)
if err != nil {
log.Println(err)
}
retryDeleteMessage:
_, err = q.DeleteMessage(message.ReceiptHandle)
if err != nil {
log.Println("error deleting message:", err)
time.Sleep(time.Second)
goto retryDeleteMessage
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment