|
diff --git a/src/database/sql/sql.go b/src/database/sql/sql.go |
|
index 8db9c78..861005f 100644 |
|
--- a/src/database/sql/sql.go |
|
+++ b/src/database/sql/sql.go |
|
@@ -217,6 +217,7 @@ type DB struct { |
|
connRequests []chan connRequest |
|
numOpen int |
|
pendingOpens int |
|
+ numClosed int64 |
|
// Used to signal the need for new connections |
|
// a goroutine running connectionOpener() reads on this chan and |
|
// maybeOpenNewConnections sends on the chan (one send per needed connection) |
|
@@ -224,10 +225,13 @@ type DB struct { |
|
// goroutine to exit. |
|
openerCh chan struct{} |
|
closed bool |
|
- dep map[finalCloser]depSet |
|
lastPut map[*driverConn]string // stacktrace of last conn's put; debug only |
|
maxIdle int // zero means defaultMaxIdleConns; negative means 0 |
|
maxOpen int // <= 0 means unlimited |
|
+ |
|
+ // Split from mu to avoid lock contention when using prepared statement in paralle. |
|
+ depmu sync.Mutex |
|
+ dep map[finalCloser]depSet |
|
} |
|
|
|
// driverConn wraps a driver.Conn with a mutex, to |
|
@@ -246,7 +250,7 @@ type driverConn struct { |
|
// guarded by db.mu |
|
inUse bool |
|
onPut []func() // code (with db.mu held) run when conn is next returned |
|
- dbmuClosed bool // same as closed, but guarded by db.mu, for connIfFree |
|
+ dbmuClosed bool // same as closed, but guarded by db.mu, for connStmt |
|
} |
|
|
|
func (dc *driverConn) releaseConn(err error) { |
|
@@ -291,7 +295,7 @@ func (dc *driverConn) closeDBLocked() func() error { |
|
return func() error { return errors.New("sql: duplicate driverConn close") } |
|
} |
|
dc.closed = true |
|
- return dc.db.removeDepLocked(dc, dc) |
|
+ return dc.db.removeDep(dc, dc) |
|
} |
|
|
|
func (dc *driverConn) Close() error { |
|
@@ -306,7 +310,7 @@ func (dc *driverConn) Close() error { |
|
// And now updates that require holding dc.mu.Lock. |
|
dc.db.mu.Lock() |
|
dc.dbmuClosed = true |
|
- fn := dc.db.removeDepLocked(dc, dc) |
|
+ fn := dc.db.removeDep(dc, dc) |
|
dc.db.mu.Unlock() |
|
return fn() |
|
} |
|
@@ -326,6 +330,7 @@ func (dc *driverConn) finalClose() error { |
|
|
|
dc.db.mu.Lock() |
|
dc.db.numOpen-- |
|
+ dc.db.numClosed++ |
|
dc.db.maybeOpenNewConnections() |
|
dc.db.mu.Unlock() |
|
|
|
@@ -361,12 +366,7 @@ type finalCloser interface { |
|
// called until all of x's dependencies are removed with removeDep. |
|
func (db *DB) addDep(x finalCloser, dep interface{}) { |
|
//println(fmt.Sprintf("addDep(%T %p, %T %p)", x, x, dep, dep)) |
|
- db.mu.Lock() |
|
- defer db.mu.Unlock() |
|
- db.addDepLocked(x, dep) |
|
-} |
|
- |
|
-func (db *DB) addDepLocked(x finalCloser, dep interface{}) { |
|
+ db.depmu.Lock() |
|
if db.dep == nil { |
|
db.dep = make(map[finalCloser]depSet) |
|
} |
|
@@ -376,22 +376,16 @@ func (db *DB) addDepLocked(x finalCloser, dep interface{}) { |
|
db.dep[x] = xdep |
|
} |
|
xdep[dep] = true |
|
+ db.depmu.Unlock() |
|
} |
|
|
|
// removeDep notes that x no longer depends on dep. |
|
-// If x still has dependencies, nil is returned. |
|
-// If x no longer has any dependencies, its finalClose method will be |
|
-// called and its error value will be returned. |
|
-func (db *DB) removeDep(x finalCloser, dep interface{}) error { |
|
- db.mu.Lock() |
|
- fn := db.removeDepLocked(x, dep) |
|
- db.mu.Unlock() |
|
- return fn() |
|
-} |
|
- |
|
-func (db *DB) removeDepLocked(x finalCloser, dep interface{}) func() error { |
|
+// If x still has dependencies, empty function is returned. |
|
+// If x no longer has any dependencies, its finalClose method is returned. |
|
+func (db *DB) removeDep(x finalCloser, dep interface{}) func() error { |
|
//println(fmt.Sprintf("removeDep(%T %p, %T %p)", x, x, dep, dep)) |
|
- |
|
+ db.depmu.Lock() |
|
+ defer db.depmu.Unlock() |
|
xdep, ok := db.dep[x] |
|
if !ok { |
|
panic(fmt.Sprintf("unpaired removeDep: no deps for %T", x)) |
|
@@ -612,7 +606,7 @@ func (db *DB) openNewConnection() { |
|
ci: ci, |
|
} |
|
if db.putConnDBLocked(dc, err) { |
|
- db.addDepLocked(dc, dc) |
|
+ db.addDep(dc, dc) |
|
db.numOpen++ |
|
} else { |
|
ci.Close() |
|
@@ -672,7 +666,7 @@ func (db *DB) conn() (*driverConn, error) { |
|
db: db, |
|
ci: ci, |
|
} |
|
- db.addDepLocked(dc, dc) |
|
+ db.addDep(dc, dc) |
|
dc.inUse = true |
|
db.mu.Unlock() |
|
return dc, nil |
|
@@ -683,42 +677,6 @@ var ( |
|
errConnBusy = errors.New("database/sql: internal sentinel error: conn is busy") |
|
) |
|
|
|
-// connIfFree returns (wanted, nil) if wanted is still a valid conn and |
|
-// isn't in use. |
|
-// |
|
-// The error is errConnClosed if the connection if the requested connection |
|
-// is invalid because it's been closed. |
|
-// |
|
-// The error is errConnBusy if the connection is in use. |
|
-func (db *DB) connIfFree(wanted *driverConn) (*driverConn, error) { |
|
- db.mu.Lock() |
|
- defer db.mu.Unlock() |
|
- if wanted.dbmuClosed { |
|
- return nil, errConnClosed |
|
- } |
|
- if wanted.inUse { |
|
- return nil, errConnBusy |
|
- } |
|
- idx := -1 |
|
- for ii, v := range db.freeConn { |
|
- if v == wanted { |
|
- idx = ii |
|
- break |
|
- } |
|
- } |
|
- if idx >= 0 { |
|
- db.freeConn = append(db.freeConn[:idx], db.freeConn[idx+1:]...) |
|
- wanted.inUse = true |
|
- return wanted, nil |
|
- } |
|
- // TODO(bradfitz): shouldn't get here. After Go 1.1, change this to: |
|
- // panic("connIfFree call requested a non-closed, non-busy, non-free conn") |
|
- // Which passes all the tests, but I'm too paranoid to include this |
|
- // late in Go 1.1. |
|
- // Instead, treat it like a busy connection: |
|
- return nil, errConnBusy |
|
-} |
|
- |
|
// putConnHook is a hook for testing. |
|
var putConnHook func(*DB, *driverConn) |
|
|
|
@@ -855,10 +813,14 @@ func (db *DB) prepare(query string) (*Stmt, error) { |
|
db.putConn(dc, err) |
|
return nil, err |
|
} |
|
+ db.mu.Lock() |
|
+ numClosed := db.numClosed |
|
+ db.mu.Unlock() |
|
stmt := &Stmt{ |
|
- db: db, |
|
- query: query, |
|
- css: []connStmt{{dc, si}}, |
|
+ db: db, |
|
+ query: query, |
|
+ css: []connStmt{{dc, si}}, |
|
+ lastNumClosed: numClosed, |
|
} |
|
db.addDep(stmt, stmt) |
|
db.putConn(dc, nil) |
|
@@ -1292,7 +1254,8 @@ type Stmt struct { |
|
// that are valid on particular connections. This is only |
|
// used if tx == nil and one is found that has idle |
|
// connections. If tx != nil, txsi is always used. |
|
- css []connStmt |
|
+ css []connStmt |
|
+ lastNumClosed int64 |
|
} |
|
|
|
// Exec executes a prepared statement with the given arguments and |
|
@@ -1372,27 +1335,24 @@ func (s *Stmt) connStmt() (ci *driverConn, releaseConn func(error), si driver.St |
|
return ci, releaseConn, s.txsi.si, nil |
|
} |
|
|
|
- for i := 0; i < len(s.css); i++ { |
|
- v := s.css[i] |
|
- _, err := s.db.connIfFree(v.dc) |
|
- if err == nil { |
|
- s.mu.Unlock() |
|
- return v.dc, v.dc.releaseConn, v.si, nil |
|
- } |
|
- if err == errConnClosed { |
|
- // Lazily remove dead conn from our freelist. |
|
- s.css[i] = s.css[len(s.css)-1] |
|
- s.css = s.css[:len(s.css)-1] |
|
- i-- |
|
+ // Lazily remove dead conn from our freelist. |
|
+ s.db.mu.Lock() |
|
+ dbClosed := s.db.numClosed |
|
+ if dbClosed-s.lastNumClosed > int64(s.db.numOpen) { |
|
+ j := 0 |
|
+ for i := 0; i < len(s.css); i++ { |
|
+ v := s.css[i] |
|
+ if !v.dc.dbmuClosed { |
|
+ s.css[j] = v |
|
+ j++ |
|
+ } |
|
} |
|
- |
|
+ s.css = s.css[:j] |
|
} |
|
+ s.lastNumClosed = dbClosed |
|
+ s.db.mu.Unlock() |
|
s.mu.Unlock() |
|
|
|
- // If all connections are busy, either wait for one to become available (if |
|
- // we've already hit the maximum number of open connections) or create a |
|
- // new one. |
|
- // |
|
// TODO(bradfitz): or always wait for one? make configurable later? |
|
dc, err := s.db.conn() |
|
if err != nil { |
|
@@ -1454,7 +1414,7 @@ func (s *Stmt) Query(args ...interface{}) (*Rows, error) { |
|
s.db.addDep(s, rows) |
|
rows.releaseConn = func(err error) { |
|
releaseConn(err) |
|
- s.db.removeDep(s, rows) |
|
+ s.db.removeDep(s, rows)() |
|
} |
|
return rows, nil |
|
} |
|
@@ -1534,7 +1494,7 @@ func (s *Stmt) Close() error { |
|
} |
|
s.mu.Unlock() |
|
|
|
- return s.db.removeDep(s, s) |
|
+ return s.db.removeDep(s, s)() |
|
} |
|
|
|
func (s *Stmt) finalClose() error { |