Skip to content

Instantly share code, notes, and snippets.

@eikeon
Last active December 11, 2015 05:08
Show Gist options
  • Save eikeon/4549731 to your computer and use it in GitHub Desktop.
Save eikeon/4549731 to your computer and use it in GitHub Desktop.
package main
import (
"log"
"time"
"github.com/eikeon/funnelsort"
"github.com/eikeon/tweet"
"github.com/stathat/go"
"launchpad.net/goamz/aws"
"launchpad.net/goamz/s3"
"launchpad.net/~prudhvikrishna/goamz/sqs"
)
func main() {
auth:
auth, err := aws.EnvAuth()
if err != nil {
log.Println("EnvAuth Failed:", err)
time.Sleep(time.Second)
goto auth
}
ss3 := s3.New(auth, aws.USEast)
ssqs := sqs.New(auth, aws.USEast)
sorted := ss3.Bucket("lctwee-sorted")
merged := ss3.Bucket("twitter-merged")
getQueue:
q, err := ssqs.GetQueue("twitter-merge-q")
if err != nil {
log.Println("error getting queue:", err)
time.Sleep(10 * time.Second)
goto getQueue
}
for {
receiveMessage:
resp, err := q.ReceiveMessage([]string{"All"}, 1, 3600)
if err != nil {
log.Println("Error receiving message:", err)
time.Sleep(time.Second)
goto receiveMessage
}
if len(resp.Messages) == 0 {
log.Println("No messages in queue. We're done.")
break
}
for _, message := range resp.Messages {
hour := message.Body
log.Println("merging:", hour)
var cs []funnelsort.Buffer
list:
response, err := sorted.List(hour, "/", "", 1000)
if err != nil {
log.Println("error listing hour:", err)
time.Sleep(time.Second)
goto list
}
for _, u := range response.Contents {
path := u.Key
getBuffer:
buffer, err := tweet.GetBuffer(sorted, path)
if err != nil {
log.Println("error getting buffer:", err)
time.Sleep(100 * time.Millisecond)
goto getBuffer
}
cs = append(cs, buffer)
}
out := tweet.NewS3TweetWriter(hour+"tweet.gz", merged)
tweet.Merge(cs, out)
go func() {
if err := stathat.PostEZCount("merge", "eikeon@eikeon.com", 1); err != nil {
log.Printf("error posting merge: %v", err)
}
}()
deleteMessage:
_, err = q.DeleteMessage(message.ReceiptHandle)
if err != nil {
log.Println("error deleting message:", err)
time.Sleep(100 * time.Millisecond)
goto deleteMessage
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment