Skip to content

Instantly share code, notes, and snippets.

@shavit
Created September 3, 2016 14:24
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save shavit/0b135cf8542e8e2bc4756cc59346b4c8 to your computer and use it in GitHub Desktop.
Save shavit/0b135cf8542e8e2bc4756cc59346b4c8 to your computer and use it in GitHub Desktop.
Go worker with MongoDB capped collection
package main
import (
"os"
"log"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
"time"
)
// Capped collection without index
type Job struct {
Id bson.ObjectId `json:"_id" bson:"_id"`
Name string `json:"name" bson:"name"`
CreatedAt time.Time `json:"createdAt" bson:"createdAt"`
}
func main(){
// change this to your capped collection name
jobsCollectionName := "jobs"
var job Job
const tailTimeout time.Duration = 120 * time.Second
// connect
con, err := mgo.Dial(os.Getenv("MONGO_URL"))
if err != nil {
log.Printf("Error connecting to MongoDB: %v\n", err)
}
defer con.Close()
dbSession := con.DB(jobsCollectionName)
// find all the jobs
jobsTail := dbSession.C(collectionName).Find(bson.M{}).Tail(tailTimeout)
defer jobsTail.Close()
for {
for jobsTail.Next(&job){
// do the work here
log.Printf("Found job: %v created at %v\n", job.Name, job.CreatedAt)
}
if jobsTail.Err() != nil {
jobsTail.Close()
}
if jobsTail.Timeout(){
log.Println("Job timout")
continue
}
// Restart and connect again to collection
jobsTail = dbSession.C(collectionName).Find(bson.M{}).Tail(tailTimeout)
log.Println("Reconnecting to the collection")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment