Skip to content

Instantly share code, notes, and snippets.

@GeraldHost
Created January 28, 2021 12:21
Show Gist options
  • Save GeraldHost/be057dccb8b5322444d76ce87e417d9a to your computer and use it in GitHub Desktop.
Save GeraldHost/be057dccb8b5322444d76ce87e417d9a to your computer and use it in GitHub Desktop.
go-lang-conn-pool
package main
import (
"github.com/valyala/fasthttp"
"net"
"time"
"sync"
"fmt"
)
type clientConn struct {
conn net.Conn
}
type connPool struct {
sync.Mutex
size int
dialer net.Dialer
addr string
conns []*clientConn
wp waitPool
}
type wantConn struct {
ready chan *clientConn
}
type waitPool struct {
sync.Mutex
waiting []*wantConn
}
func (wp *waitPool) queueIdle() *wantConn {
wp.Lock()
defer wp.Unlock()
wc := &wantConn {
ready: make(chan *clientConn, 1),
}
wp.waiting = append(wp.waiting, wc)
return wc
}
func (wp *waitPool) TryDeliverConn(cc *clientConn) bool {
for wp.len() > 0 {
wc := wp.Shift()
select {
case wc.ready <- cc:
return true
default:
return false
}
}
return false
}
func (wp *waitPool) len() int {
return len(wp.waiting)
}
func (wp *waitPool) Shift() *wantConn {
wp.Lock()
defer wp.Unlock()
wc := wp.waiting[0]
wp.waiting = wp.waiting[1:]
return wc
}
func (cp *connPool) Prepare() {
cp.Lock()
defer cp.Unlock()
for i:=0; i<cp.size; i++ {
conn, err := cp.dialer.Dial("tcp", cp.addr)
if err != nil {
panic(err)
}
cc := clientConn { conn: conn }
cp.conns = append(cp.conns, &cc)
}
}
func (cp *connPool) Shift() *clientConn {
cp.Lock()
defer cp.Unlock()
conn := cp.conns[0]
cp.conns = cp.conns[1:]
return conn
}
func (cp *connPool) Push(cc *clientConn) {
cp.Lock()
defer cp.Unlock()
cp.conns = append(cp.conns, cc)
}
func (cp *connPool) AcquireConn() *clientConn {
if len(cp.conns) <= 0 {
fmt.Println("Queue for conn")
wc := cp.wp.queueIdle()
select {
case cc := <- wc.ready:
fmt.Println("Got connection!")
return cc
// TODO case <- timer.done:
}
}
clientConn := cp.Shift()
return clientConn
}
func (cp *connPool) ReleaseConn(cc *clientConn) {
if len(cp.conns) >= cp.size {
fmt.Println("Collection pool full closing connection")
cc.conn.Close()
}
delivered := false
if cp.wp.len() > 0 {
fmt.Println("Try to deliver")
delivered = cp.wp.TryDeliverConn(cc)
}
if !delivered {
fmt.Println("Not delivered")
cp.Push(cc)
}
}
func (cp *connPool) ReleaseAndRedialConn(cc *clientConn) {
fmt.Println("Redial")
}
func NewConnPool(size int) connPool {
cp := connPool {
size: size,
addr: "localhost:3000",
conns: []*clientConn{},
dialer: net.Dialer{Timeout: 10 * time.Second},
}
return cp
}
func DoRequest(cp *connPool, wg *sync.WaitGroup) {
cc := cp.AcquireConn()
conn := cc.conn
fmt.Println("Do request")
buff := make([]byte, 12)
_, write_err := conn.Write([]byte("HEAD / HTTP/1.1\r\nAccept-Encoding: gzip\r\n\r\n"))
if write_err != nil {
cp.ReleaseAndRedialConn(cc)
return
}
_, read_err := conn.Read(buff)
if read_err != nil {
cp.ReleaseAndRedialConn(cc)
return
}
wg.Done()
cp.ReleaseConn(cc)
}
func main() {
var wg sync.WaitGroup
cp := NewConnPool(1)
cp.Prepare()
for i:=0; i<10; i++ {
wg.Add(1)
go DoRequest(&cp, &wg)
}
wg.Wait()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment