Skip to content

Instantly share code, notes, and snippets.

@eikeon
Last active December 11, 2015 12:29
Show Gist options
  • Save eikeon/4601373 to your computer and use it in GitHub Desktop.
Save eikeon/4601373 to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"log"
"time"
"github.com/eikeon/tweet"
"github.com/stathat/go"
"launchpad.net/goamz/aws"
"launchpad.net/goamz/s3"
"launchpad.net/~prudhvikrishna/goamz/sqs"
)
func main() {
auth:
auth, err := aws.EnvAuth()
if err != nil {
log.Println("EnvAuth Failed:", err)
time.Sleep(time.Second)
goto auth
}
ss3 := s3.New(auth, aws.USEast)
ssqs := sqs.New(auth, aws.USEast)
merged := ss3.Bucket("twitter-merged")
mergedCount := ss3.Bucket("twitter-merged-count")
getQueue:
q, err := ssqs.GetQueue("twitter-count-q")
if err != nil {
log.Println("error getting queue:", err)
time.Sleep(10 * time.Second)
goto getQueue
}
for {
resp, err := q.ReceiveMessage([]string{"All"}, 1, 3600)
if err != nil {
log.Fatal(err)
}
if len(resp.Messages) == 0 {
break
}
for _, message := range resp.Messages {
name := message.Body + "tweet.gz"
getReader:
reader, err := tweet.GetReader(merged, name)
if err != nil {
log.Println("error getting reader:", err)
time.Sleep(100 * time.Millisecond)
goto getReader
}
count := int64(0)
for {
if item := reader.Read(); item != nil {
count++
} else {
break
}
}
log.Println("name:", name, "count:", count)
retryCount:
err = mergedCount.Put(name, []byte(fmt.Sprintf("%d", count)), "text/string", s3.Private)
if err != nil {
log.Println("error putting count:", err)
time.Sleep(time.Second)
goto retryCount
}
if err := stathat.PostEZCount("merged count", "eikeon@eikeon.com", 1); err != nil {
log.Printf("error posting merged count: %v", err)
}
retryDeleteMessage:
_, err = q.DeleteMessage(message.ReceiptHandle)
if err != nil {
log.Println("error deleting message:", err)
time.Sleep(time.Second)
goto retryDeleteMessage
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment