| package main | |
| import ( | |
| "bufio" | |
| "flag" | |
| "fmt" | |
| "log" | |
| "net" | |
| "os" | |
| "sync/atomic" | |
| "time" | |
| "runtime" | |
| "sync" | |
| "github.com/dustin/go-humanize" | |
| ) | |
| var clientCount = 0 | |
| var allClients = make(map[chan []byte]int) | |
| var receivedMessages uint64 = 0 | |
| var sentMessages uint64 = 0 | |
| var messageWrites uint64 = 0 | |
| var connLock sync.RWMutex | |
| func runtimeStats(portNum string) { | |
| var m runtime.MemStats | |
| time.Sleep(3 * time.Second) | |
| for { | |
| runtime.ReadMemStats(&m) | |
| fmt.Println() | |
| fmt.Printf("Goroutines:\t%7d\t\t Clients:\t%7d\t\t Received Messages:\t%7d\n", runtime.NumGoroutine(), clientCount, atomic.LoadUint64(&receivedMessages)) | |
| fmt.Printf("Next GC:\t%7s\t\t Sent Msgs:\t%7d\t\t Message Writes:\t%7d\n", humanize.Bytes(m.NextGC), atomic.LoadUint64(&sentMessages), atomic.LoadUint64(&messageWrites)) | |
| fmt.Printf("Memory Acq:\t%7s\t\t Heap Alloc:\t%7s\n", humanize.Bytes(m.HeapSys), humanize.Bytes(m.HeapAlloc)) | |
| fmt.Printf("GC Pool Mem:\t%7s\t\t Heap InUse:\t%7s\n", humanize.Bytes(m.HeapIdle-m.HeapReleased), humanize.Bytes(m.HeapInuse)) | |
| fmt.Println() | |
| time.Sleep(5 * time.Second) | |
| } | |
| } | |
| func sendDataToClient(conn net.Conn, client chan []byte) { | |
| for msg := range client { | |
| // we will pull all buffered messages from client immediately | |
| // which reduces the number of Writes needed to forward messages | |
| // msg's underlying array is shared between all goroutines, so we | |
| // need to copy it into a new slice to prevent shared mutation | |
| buffer := append([]byte{}, msg...) | |
| atomic.AddUint64(&sentMessages, 1) | |
| buffered: | |
| for { | |
| select { | |
| case nextMsg := <-client: | |
| buffer = append(buffer, nextMsg...) | |
| atomic.AddUint64(&sentMessages, 1) | |
| default: | |
| // once we have the last buffered message, time to send the buffer | |
| break buffered | |
| } | |
| } | |
| n, err := conn.Write(buffer) | |
| if err != nil { | |
| log.Printf("Client %s disconnected \n", conn.RemoteAddr().String()) | |
| removeFromConnMap(client, false) | |
| break | |
| } else if n != len(buffer) { | |
| log.Printf("Client connection did not send expected number of bytes, %d != %d", n, len(buffer)) | |
| } else { | |
| atomic.AddUint64(&messageWrites, 1) | |
| } | |
| } | |
| conn.Close() | |
| log.Println("sendDataToClient goroutine exiting") | |
| } | |
| func sendDataToClients(msg string) { | |
| // VRS ADSBx specific since no newline is printed between data bursts | |
| // we use ] and must add } closure | |
| msg += "}" | |
| byteMsg := []byte(msg) | |
| connLock.RLock() | |
| for client := range allClients { | |
| select { | |
| case client <- byteMsg: | |
| // cases don't fallthrough | |
| default: | |
| // client is slow enough to let their buffer fill, remove them | |
| log.Println("Client exceeded buffer, removing") | |
| go removeFromConnMap(client, true) | |
| } | |
| } | |
| connLock.RUnlock() | |
| } | |
| func removeFromConnMap(client chan []byte, shouldClose bool) { | |
| connLock.Lock() | |
| // under high load, this function can be called multiple times for a single client | |
| _, exists := allClients[client] | |
| if exists { | |
| delete(allClients, client) | |
| clientCount = len(allClients) | |
| if shouldClose { | |
| // only sender should close channel | |
| close(client) | |
| } | |
| } | |
| connLock.Unlock() | |
| } | |
| func addToConnMap(conn net.Conn) { | |
| connLock.Lock() | |
| log.Println("adding client", clientCount+1) | |
| // each client is given a 'generous' buffer of a couple seconds worth of messages | |
| connChan := make(chan []byte, 25) | |
| allClients[connChan] = 0 | |
| clientCount = len(allClients) | |
| connLock.Unlock() | |
| go sendDataToClient(conn, connChan) | |
| } | |
| func handleTCPIncoming(hostName string, portNum string) { | |
| conn, err := net.Dial("tcp", hostName+":"+portNum) | |
| // exit on TCP connect failure | |
| if err != nil { | |
| log.Fatal(err) | |
| } | |
| defer conn.Close() | |
| // constantly read JSON from PUB-VRS and write to the buffer | |
| data := bufio.NewReader(conn) | |
| for { | |
| scan, err := data.ReadString(']') | |
| if len(scan) == 0 || err != nil { | |
| break | |
| } | |
| atomic.AddUint64(&receivedMessages, 1) | |
| go sendDataToClients(scan) | |
| } | |
| } | |
| func handleTCPOutgoing(outportNum string) { | |
| // print error on listener error | |
| server, err := net.Listen("tcp", ":"+outportNum) | |
| if err != nil { | |
| log.Fatalf("Listener err: %s\n", err) | |
| } | |
| for { | |
| incoming, err := server.Accept() | |
| // print error and continue waiting | |
| if err != nil { | |
| log.Println(err) | |
| } else { | |
| go addToConnMap(incoming) | |
| } | |
| } | |
| } | |
| func main() { | |
| hostName := flag.String("hostname", "", "what host to connect to") | |
| portNum := flag.String("port", "", "which port to connect with") | |
| outportNum := flag.String("listenport", "", "which port to listen on") | |
| flag.Parse() | |
| if *hostName == "" || *portNum == "" || *outportNum == "" { | |
| fmt.Println("usage: dial-tcp -hostname <input> -port <port> -listenport <output>") | |
| os.Exit(1) | |
| } | |
| go runtimeStats(*portNum) | |
| go handleTCPOutgoing(*outportNum) | |
| // if this function returns, the main thread will exit, which exits the entire program | |
| handleTCPIncoming(*hostName, *portNum) | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment