Skip to content

Instantly share code, notes, and snippets.

@boo1ean
Created July 20, 2014 11:45
Show Gist options
  • Save boo1ean/20aa10051583aba3bd76 to your computer and use it in GitHub Desktop.
Save boo1ean/20aa10051583aba3bd76 to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"strings"
"sync"
"labix.org/v2/mgo"
"labix.org/v2/mgo/bson"
)
type Pollution struct {
ID bson.ObjectId `bson:"_id,omitempty"`
ESRI_Key string `bson:"ESRI_Key,omitempty"`
}
type Waterbody struct {
ID bson.ObjectId `bson:"_id,omitempty"`
ESRI_Key string `bson:"LISTED_WATER_ID,omitempty"`
}
const maxConcurrency = 16
var throttle = make(chan int, maxConcurrency)
func main() {
session, err := mgo.Dial("127.0.0.1")
if err != nil {
panic(err)
}
defer session.Close()
session.SetMode(mgo.Monotonic, true)
// Collection People
c := session.DB("mydb").C("pollutions")
// Query for object
var pollutions []Pollution
err = c.Find(nil).All(&pollutions)
if err != nil {
panic(err)
}
var waitGroup sync.WaitGroup
waitGroup.Add(len(pollutions))
fmt.Println("Total documents:", len(pollutions))
for i, p := range pollutions {
throttle <- 1
go ProcessPollution(i, p, &waitGroup, session)
}
waitGroup.Wait()
fmt.Println("Everyting is done!")
}
func ProcessPollution(i int, p Pollution, waitGroup *sync.WaitGroup, session *mgo.Session) {
defer waitGroup.Done()
var key = strings.Split(p.ESRI_Key, "<<=>>")[0]
sessionCopy := session.Copy()
defer sessionCopy.Close()
waterbodies := sessionCopy.DB("mydb").C("waterbodies")
var waterbody Waterbody
waterbodies.Find(bson.M{"LISTED_WATER_ID": key}).One(&waterbody)
<-throttle
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment