Last active
March 14, 2017 11:20
-
-
Save smoya/7974b07539e33a9cabae03e762ce7f4a to your computer and use it in GitHub Desktop.
Concurrent queries to MongoDB using mgo ends in closed sockets.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
version: '2' | |
services: | |
mongo: | |
image: mongo:3.4.2 | |
ports: | |
- "27018:27017" | |
restart: always |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"log" | |
"sync" | |
"flag" | |
"gopkg.in/mgo.v2" | |
) | |
var numOfInserts = flag.Int("inserts", 2000, "Number of inserts. -inserts=2000") | |
type Person struct { | |
Name string | |
} | |
var guy = Person{ | |
Name: "Guybrush", | |
} | |
func main() { | |
flag.Parse() | |
masterSession := connect() | |
defer masterSession.Close() | |
wg := sync.WaitGroup{} | |
// inserting without copying the session. | |
wg.Add(*numOfInserts) | |
for i := 0; i < *numOfInserts; i++ { | |
go doInsert(&wg, masterSession) | |
} | |
wg.Wait() | |
fmt.Println("Inserted WITH NO session copy: ", count(masterSession)) | |
// We ensure that the collection is empty. | |
clear(masterSession) | |
// inserting copying the session. | |
wg.Add(*numOfInserts) | |
for i := 0; i < *numOfInserts; i++ { | |
go doInsertCopyingSession(&wg, masterSession) | |
} | |
wg.Wait() | |
fmt.Println("Inserted WITH session copy: ", count(masterSession)) | |
// We ensure that the collection is empty. | |
clear(masterSession) | |
} | |
func clear(session *mgo.Session) { | |
err := session.DB("").C("person").DropCollection() | |
if err != nil { | |
log.Fatal(err) | |
} | |
} | |
func count(session *mgo.Session) int { | |
count, err := session.DB("").C("person").Count() | |
if err != nil { | |
log.Fatal(err) | |
} | |
return count | |
} | |
func doInsertCopyingSession(wg *sync.WaitGroup, session *mgo.Session) { | |
ses := session.Copy() | |
defer ses.Close() | |
doInsert(wg, ses) | |
} | |
func doInsert(wg *sync.WaitGroup, session *mgo.Session) { | |
defer wg.Done() | |
err := session.DB("").C("person").Insert(guy) | |
if err != nil { | |
log.Println("[error] during insert: ", err) | |
} | |
} | |
func connect() *mgo.Session { | |
session, err := mgo.Dial("localhost:27018/dev") | |
if err != nil { | |
log.Fatal(err) | |
} | |
// For this test we don't care about up-to-date reads. | |
session.SetMode(mgo.Monotonic, true) | |
return session | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment