Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
// This is a reduced test case for
// What it does is to overload DialServer() and return a custom net.Conn.
// Some of these custom connections timeout _once_ after the first 1000 bytes are read.
// It seems like DialServer() is called more times than the PoolLimit, which is
// set to 5, and not all these sockets are used, so for convenience, we pick
// every third connection to automatically time out once.
// It is still possible for this test to be slightly flaky, you may have to run it more than once.
// Remember to edit the Dial URL and the database name in insert to something valid.
// Running this test with `go run ./main.go` will cause ~11 sockets to be
// created. Some of these will time out, at which point, since the server's
// `abended` is not reset, every subsequent query, even through sockets that
// are _NOT_ failed, will keep performing the authentication cycle. This will
// lead to output like:
// Having to master check 0xc420014a20
// Having to master check 0xc4200da060
// Having to master check 0xc4200da4c0
// Having to master check 0xc420014bc0
// ...
// This output is from the Write() snooping we are doing and means that the
// client is verifying master on every socket, as described in #254.
package main
import (
type myConn struct {
bytesRead uint64
tripped bool
func (c *myConn) Read(b []byte) (n int, err error) {
if c.bytesRead == 0 && !c.tripped {
fmt.Printf("Going to eventually trip %p\n", c)
n, err = c.Conn.Read(b)
c.bytesRead += uint64(n)
if c.bytesRead > 1000 && !c.tripped {
fmt.Printf("Socket not tripped yet, timing out %p\n", c)
time.Sleep(5 * time.Second)
c.tripped = true
return n, err
func (c *myConn) Write(b []byte) (n int, err error) {
if bytes.Contains(b, []byte("ismaster")) {
fmt.Printf("Having to master check %p\n", c)
n, err = c.Conn.Write(b)
return n, err
type myLogger struct{}
func (m *myLogger) Output(calldepth int, s string) error {
fmt.Fprintln(os.Stderr, s)
return nil
func main() {
info, err := mgo.ParseURL("mongodb://localhost:32768/mgo-trawl")
if err != nil {
fmt.Println("Error parsing url", err)
info.Timeout = 2 * time.Second
info.PoolLimit = 5
dialCalls := 0
var dialMutex sync.Mutex
info.DialServer = func(addr *mgo.ServerAddr) (net.Conn, error) {
d := dialCalls
conn, err := net.DialTimeout("tcp", addr.String(), 10*time.Second)
if err != nil {
return nil, err
c := &myConn{conn, 0, d%3 == 0 /* only a few of them trips */}
fmt.Printf("Established wrapped conn %p\n", c)
return c, nil
session, err := mgo.DialWithInfo(info)
if err != nil {
fmt.Println("Error establishing session", err)
var wg sync.WaitGroup
// ensure all sockets in pool get used.
for j := 11; j >= 0; j-- {
go func(i int) {
for k := 10000; k >= 0; k-- {
s := session.Copy()
err = s.DB("mgo-trawl").C("test").Insert(bson.M{"hi": "there"})
if err != nil {
fmt.Println("Insert error!", k, err)
fmt.Println("All inserts done")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.