Skip to content

Instantly share code, notes, and snippets.

@Dieterbe
Created February 21, 2018 16:27
Show Gist options
  • Save Dieterbe/ecfa367ce57d66fdf3a5404b2194a56f to your computer and use it in GitHub Desktop.
Save Dieterbe/ecfa367ce57d66fdf3a5404b2194a56f to your computer and use it in GitHub Desktop.
diff --git a/destination/conn.go b/destination/conn.go
index af03e57..e185266 100644
--- a/destination/conn.go
+++ b/destination/conn.go
@@ -5,6 +5,7 @@ import (
"io"
"net"
"os"
+ "sync/atomic"
"time"
"github.com/Dieterbe/go-metrics"
@@ -14,6 +15,8 @@ import (
var keepsafe_initial_cap = 100000 // not very important
+var numConns int32
+
// this interval should be long enough to capture all failure modes
// (endpoint down, delayed timeout, etc), so it should be at least as long as the flush interval
var keepsafe_keep_duration = time.Duration(10 * time.Second)
@@ -62,6 +65,8 @@ func NewConn(addr string, dest *Destination, periodFlush time.Duration, pickle b
return nil, err
}
cleanAddr := util.AddrToPath(addr)
+ atomic.AddInt32(&numConns, 1)
+ fmt.Println("DIETER NUMCONNS", numConns)
connObj := &Conn{
conn: conn,
buffered: NewWriter(conn, ioBufSize, cleanAddr),
diff --git a/destination/destination.go b/destination/destination.go
index 51f6668..e66708b 100644
--- a/destination/destination.go
+++ b/destination/destination.go
@@ -5,6 +5,7 @@ import (
"fmt"
"strings"
"sync"
+ "sync/atomic"
"time"
"github.com/Dieterbe/go-metrics"
@@ -297,6 +298,9 @@ func (dest *Destination) relay() {
if dest.Spool {
go dest.collectRedo(conn)
}
+ atomic.AddInt32(&numConns, -1)
+ fmt.Println("DIETER NUMCONNS", numConns)
+
conn = nil
}
}
@@ -345,6 +349,9 @@ func (dest *Destination) relay() {
if conn != nil {
conn.Flush()
conn.Close()
+ conn = nil
+ atomic.AddInt32(&numConns, -1)
+ fmt.Println("DIETER NUMCONNSWOWZA", numConns)
}
if dest.spool != nil {
dest.spool.Close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment