Instantly share code, notes, and snippets.

@coder543 /tcp_relay.go Secret
Last active Aug 30, 2018

Embed
What would you like to do?
package main
import (
"bufio"
"flag"
"fmt"
"log"
"net"
"os"
"sync/atomic"
"time"
"runtime"
"sync"
"github.com/dustin/go-humanize"
)
var clientCount = 0
var allClients = make(map[chan []byte]int)
var receivedMessages uint64 = 0
var sentMessages uint64 = 0
var messageWrites uint64 = 0
var connLock sync.RWMutex
func runtimeStats(portNum string) {
var m runtime.MemStats
time.Sleep(3 * time.Second)
for {
runtime.ReadMemStats(&m)
fmt.Println()
fmt.Printf("Goroutines:\t%7d\t\t Clients:\t%7d\t\t Received Messages:\t%7d\n", runtime.NumGoroutine(), clientCount, atomic.LoadUint64(&receivedMessages))
fmt.Printf("Next GC:\t%7s\t\t Sent Msgs:\t%7d\t\t Message Writes:\t%7d\n", humanize.Bytes(m.NextGC), atomic.LoadUint64(&sentMessages), atomic.LoadUint64(&messageWrites))
fmt.Printf("Memory Acq:\t%7s\t\t Heap Alloc:\t%7s\n", humanize.Bytes(m.HeapSys), humanize.Bytes(m.HeapAlloc))
fmt.Printf("GC Pool Mem:\t%7s\t\t Heap InUse:\t%7s\n", humanize.Bytes(m.HeapIdle-m.HeapReleased), humanize.Bytes(m.HeapInuse))
fmt.Println()
time.Sleep(5 * time.Second)
}
}
func sendDataToClient(conn net.Conn, client chan []byte) {
for msg := range client {
// we will pull all buffered messages from client immediately
// which reduces the number of Writes needed to forward messages
// msg's underlying array is shared between all goroutines, so we
// need to copy it into a new slice to prevent shared mutation
buffer := append([]byte{}, msg...)
atomic.AddUint64(&sentMessages, 1)
buffered:
for {
select {
case nextMsg := <-client:
buffer = append(buffer, nextMsg...)
atomic.AddUint64(&sentMessages, 1)
default:
// once we have the last buffered message, time to send the buffer
break buffered
}
}
n, err := conn.Write(buffer)
if err != nil {
log.Printf("Client %s disconnected \n", conn.RemoteAddr().String())
removeFromConnMap(client, false)
break
} else if n != len(buffer) {
log.Printf("Client connection did not send expected number of bytes, %d != %d", n, len(buffer))
} else {
atomic.AddUint64(&messageWrites, 1)
}
}
conn.Close()
log.Println("sendDataToClient goroutine exiting")
}
func sendDataToClients(msg string) {
// VRS ADSBx specific since no newline is printed between data bursts
// we use ] and must add } closure
msg += "}"
byteMsg := []byte(msg)
connLock.RLock()
for client := range allClients {
select {
case client <- byteMsg:
// cases don't fallthrough
default:
// client is slow enough to let their buffer fill, remove them
log.Println("Client exceeded buffer, removing")
go removeFromConnMap(client, true)
}
}
connLock.RUnlock()
}
func removeFromConnMap(client chan []byte, shouldClose bool) {
connLock.Lock()
// under high load, this function can be called multiple times for a single client
_, exists := allClients[client]
if exists {
delete(allClients, client)
clientCount = len(allClients)
if shouldClose {
// only sender should close channel
close(client)
}
}
connLock.Unlock()
}
func addToConnMap(conn net.Conn) {
connLock.Lock()
log.Println("adding client", clientCount+1)
// each client is given a 'generous' buffer of a couple seconds worth of messages
connChan := make(chan []byte, 25)
allClients[connChan] = 0
clientCount = len(allClients)
connLock.Unlock()
go sendDataToClient(conn, connChan)
}
func handleTCPIncoming(hostName string, portNum string) {
conn, err := net.Dial("tcp", hostName+":"+portNum)
// exit on TCP connect failure
if err != nil {
log.Fatal(err)
}
defer conn.Close()
// constantly read JSON from PUB-VRS and write to the buffer
data := bufio.NewReader(conn)
for {
scan, err := data.ReadString(']')
if len(scan) == 0 || err != nil {
break
}
atomic.AddUint64(&receivedMessages, 1)
go sendDataToClients(scan)
}
}
func handleTCPOutgoing(outportNum string) {
// print error on listener error
server, err := net.Listen("tcp", ":"+outportNum)
if err != nil {
log.Fatalf("Listener err: %s\n", err)
}
for {
incoming, err := server.Accept()
// print error and continue waiting
if err != nil {
log.Println(err)
} else {
go addToConnMap(incoming)
}
}
}
func main() {
hostName := flag.String("hostname", "", "what host to connect to")
portNum := flag.String("port", "", "which port to connect with")
outportNum := flag.String("listenport", "", "which port to listen on")
flag.Parse()
if *hostName == "" || *portNum == "" || *outportNum == "" {
fmt.Println("usage: dial-tcp -hostname <input> -port <port> -listenport <output>")
os.Exit(1)
}
go runtimeStats(*portNum)
go handleTCPOutgoing(*outportNum)
// if this function returns, the main thread will exit, which exits the entire program
handleTCPIncoming(*hostName, *portNum)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment