Skip to content

Instantly share code, notes, and snippets.

@eikeon
Last active December 10, 2015 18:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save eikeon/4478036 to your computer and use it in GitHub Desktop.
Save eikeon/4478036 to your computer and use it in GitHub Desktop.
package main
import (
"io"
"log"
"os"
"os/exec"
"path"
"strings"
"time"
"github.com/eikeon/tweet"
"github.com/stathat/go"
"launchpad.net/goamz/aws"
"launchpad.net/goamz/s3"
"launchpad.net/~prudhvikrishna/goamz/sqs"
)
func sortandsplit(name string, unsorted, sorted *s3.Bucket) {
log.Println("downloading:", name)
getReader:
body, err := unsorted.GetReader(name)
if err != nil {
log.Println("error getting reader:", err)
time.Sleep(time.Second)
goto getReader
}
local_path := path.Join("/mnt", name)
if f, err := os.OpenFile(local_path, os.O_CREATE|os.O_WRONLY, 0666); err == nil {
if _, err = io.Copy(f, body); err != nil {
log.Fatal("error copying:", err)
}
} else {
log.Println("could not open:", local_path)
log.Fatal("error opening file:", err)
}
body.Close()
read_cmd := exec.Command("lzop", "-fdc", local_path)
rc, err := read_cmd.StdoutPipe()
if err != nil {
log.Fatal("could not connect to standard out:", err)
}
if err = read_cmd.Start(); err != nil {
log.Fatal("error starting command:", err)
}
tw := tweet.NewSplitTweetWriter(strings.Replace(name, ".lzo", "", 1), sorted)
tweet.Sort(rc, tw)
rc.Close()
tw.Close()
if err := read_cmd.Wait(); err != nil {
log.Println("error waiting on read command")
log.Fatal(err)
}
err = os.Remove(local_path)
if err != nil {
log.Println("error removing:", err)
}
go func() {
if err := stathat.PostEZCount("sortandsplit", "eikeon@eikeon.com", 1); err != nil {
log.Printf("error posting sortandsplit: %v", err)
}
}()
}
func main() {
auth, err := aws.EnvAuth()
if err != nil {
log.Fatal("envauth error:", err)
}
s := s3.New(auth, aws.USEast)
unsorted := s.Bucket("twitter-unsorted")
sorted := s.Bucket("lctwee-sorted")
ssqs := sqs.New(auth, aws.USEast)
retryGetQueue:
q, err := ssqs.GetQueue("twitter-sortandsplit-q")
if err != nil {
log.Println("error getting queue:", err)
time.Sleep(time.Second)
goto retryGetQueue
}
for {
receiveMessage:
resp, err := q.ReceiveMessage([]string{"All"}, 1, 3600)
if err != nil {
log.Println("Error receiving message:", err)
time.Sleep(time.Second)
goto receiveMessage
}
if len(resp.Messages) == 0 {
log.Println("No messages in queue. We're done.")
break
}
for _, message := range resp.Messages {
part := message.Body
log.Println("sortandsplit:", part)
sortandsplit(part, unsorted, sorted)
retryDeleteMessage:
_, err = q.DeleteMessage(message.ReceiptHandle)
if err != nil {
log.Println("error deleting message:", err)
time.Sleep(time.Second)
goto retryDeleteMessage
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment