Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Simple HTTP server to intelligently route and multiplex requests over a collection of backing network connections
// Small exploration into the ease of implementing an interface (and server) to abstract
// away the complexities of dealing with cumbersome and non-standardized network
// protocols.
//
// The resulting server exposes a simple HTTP API to clients, routing and forwarding
// their requests behind the scenes to a collection of raw network streams. The goal
// is to hide the underlying communication protocol and network multiplexing from
// clients, instead exposing a simple request/response API spoken in the lingua franca
// of the web.
//
// Only the Go standard library is used.
//
package main
import (
"bufio"
"encoding/json"
"fmt"
"log"
"net"
"net/http"
"strconv"
"sync"
)
const (
MULTIPLEXED_CONN_TYPE = "tcp"
ROUTE_SERVER = "localhost"
LISTEN_PORT = 3000
)
// Simplistic example request struct to demonstrate client interaction with the
// SocketMultiplexer. In a real the client request would likely contain more
// business logic specific information/ fields.
type Request struct {
RouteId string // some identifier to choose from the possible available connections
Data string // the data to send
}
// Simplistic example response struct to demonstrate client interaction with the
// SocketMultiplexer. In a real the server response would likely contain more
// business logic specific information/ fields.
type Response struct {
Success bool // whether or not the request was successful
Data string // response data
}
// A Route allows for synchronous communication over a given network stream
// The writer and scanner dictate the specific dialect that is extracted from
// the stream of bytes sent and received on the connection.
type Route struct {
mutex sync.Mutex
conn net.Conn
writer *bufio.Writer
scanner *bufio.Scanner
}
// Initialize a new Route based on the given network connection. Here we automatically
// initialize the underlying scanner and writer, but in a real application they would
// likely have custom initialization logic to configure the reading and writing of bytes
// based on the underlying network protocol.
func NewRoute(conn net.Conn) *Route {
route := Route{}
route.conn = conn
route.writer = bufio.NewWriter(conn)
route.scanner = bufio.NewScanner(conn)
return &route
}
// Transmit allows for synchronous communication over the given Routes'
// underlying connection
//
// This function also serves as an example of bidirectional communication
// over a raw data stream. In this case, we're using newlines as the delimiter,
// but the actual message format could be anything, with the Routes writer and
// scanner configured to understand the protocol.
func (r *Route) Transmit(data []byte) ([]byte, error) {
r.mutex.Lock()
defer r.mutex.Unlock()
// write request data
_, err := r.writer.Write(data)
if err == nil {
_, err = r.writer.WriteString("\r\n")
}
if err == nil {
err = r.writer.Flush()
}
if err != nil {
return nil, err
}
// read response
r.scanner.Scan()
return r.scanner.Bytes(), nil
}
// A SocketMultiplexer server exposes a simple HTTP API to clients, listening for
// requests and automatically routing them to the appropriate underlying network stream.
// Client data is forwarded as raw bytes, and the response is collected and returned
// as a regular HTTP response.
//
// The SocketMultiplexer ensures that concurrent requests to the same network stream do
// not mangle eachother, and maintains long-lived connections to avoid handshake
// overhead.
type SocketMultiplexer struct {
listenPort int
routesMutex sync.RWMutex
routes map[string]*Route
}
func NewSocketMultiplexer(listenPort int) *SocketMultiplexer {
return &SocketMultiplexer{listenPort: listenPort, routes: make(map[string]*Route)}
}
func (s *SocketMultiplexer) Port() int {
return s.listenPort
}
// Listen starts up a blocking HTTP server that listens for client requests and
// forwards them on to the appropriate SocketMultiplexer handler. (Currently just a
// single demonstration handler, but more could easily be registered based on use case)
// Note that each incoming request is handled by its own go routine, so multiple
// requests can run concurrently. This increases throughput, but requires that
// the SocketMultiplexer internals are threadsafe.
func (s *SocketMultiplexer) Listen() error {
mux := http.NewServeMux()
// Register external HTTP API for clients to interact with
mux.HandleFunc("/transmit", s.transmitHandler)
log.Printf("Listening on port %d", s.listenPort)
return http.ListenAndServe(fmt.Sprintf(":%d", s.listenPort), mux)
}
// transmitHandler orchestrates multiplexing an incoming request over the appropriate
// underlying network stream. It is responsible for parsing and validating the
// supplied request, routing and transmitting data, and packaging up the underlying
// response in a form that the caller can understand.
//
// For demonstration purposes, we use a simple JSON request/ response format, but
// this can easily be changed to whatever format clients expect.
func (s *SocketMultiplexer) transmitHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
log.Println("Ignoring non-POST request")
http.Error(w, "Can only transmit via POST request", http.StatusMethodNotAllowed)
return
}
// Parse request
var request Request
err := json.NewDecoder(r.Body).Decode(&request)
if err != nil {
log.Println("Unable to decode request:", err.Error())
http.Error(w, "Unable to decode request", http.StatusInternalServerError)
return
}
// Route request
route, ok := s.getRoute(request.RouteId)
if !ok {
http.Error(
w,
fmt.Sprintf("No route established for the given ID (%s)", request.RouteId),
http.StatusBadRequest)
return
}
responseData, err := s.transmitData(route, request.Data)
if err != nil {
log.Println("Unable to communicate with underlying network stream", err.Error())
http.Error(w, "Network connectivity issue", http.StatusInternalServerError)
return
}
// Respond
response := Response{Success: true, Data: responseData}
// Note that while "success" is hardcoded to true here, in a real application,
// its value would depend on whether the service at the other end of the
// connection thought the request was successful or not
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
// AddRoute adds a TCP stream for the given host and port identified by the supplied
// route ID. Later requests and responses can use that route ID to send and receive data
// on that stream.
//
// Errors if the route ID already exists to avoid overwriting an active stream
func (s *SocketMultiplexer) AddRoute(routeId string, host string, port int) error {
s.routesMutex.Lock()
defer s.routesMutex.Unlock()
if _, ok := s.routes[routeId]; ok {
return fmt.Errorf("Route ID %s already exists", routeId)
}
conn, err := net.Dial(MULTIPLEXED_CONN_TYPE, net.JoinHostPort(host, strconv.Itoa(port)))
if err != nil {
return err
}
s.routes[routeId] = NewRoute(conn)
return nil
}
// getRoute fetches the route associated with the given routeId in a threadsafe fashion
func (s *SocketMultiplexer) getRoute(routeId string) (*Route, bool) {
s.routesMutex.RLock()
defer s.routesMutex.RUnlock()
route, ok := s.routes[routeId]
return route, ok
}
// transmitData transmits the given data over the supplied route. Note that once you
// have a pointer to a specific route, you aren't required to protect it against
// concurrent access, because the Route structure itself is designed to be threadsafe.
func (s *SocketMultiplexer) transmitData(route *Route, data string) (string, error) {
responseBytes, err := route.Transmit([]byte(data))
return string(responseBytes), err
}
// Demonstration usage of the SocketMultiplexer.
//
// Here, we create a new SocketMultiplexer with hardcoded, preregistered
// routes. In a real application, routes would likely be part of a config file, or
// registered on the fly, both of which are fully supported via the AddRoute function.
// We only hardcode them here for simplicity and easy of demonstration.
//
// In order to run this main method, you'll first have to run a server capable of
// ACKing TCP connections on ports 3001 and 3002. The simplest way to do that is via
// the netcat tool: `nc -l PORT_NUMBER`. In our case, that means 2 instances of netcat
// running on ports 3001 and 3002. These two netcat processes simulate the types of
// raw socket communicating network services that this SocketMultiplexer is useful in
// fronting.
//
// Once that's done, you can run the SocketMultiplexer via your standard `go run`
// command (or equivalent). This will start a blocking http server, as well as
// initiate connections to the specified server + ports.
//
// With those two pieces in place, you can see the entire system working end to end by
// mimicing a client request. Using cURL (or equivalent), you can make a POST request
// to the port that the SockerMultiplexer server is running (3000 by default).
// For example: `curl --header "Content-Type: application/json" --request POST --data '{"RouteId": "a", "Data": "test request"}' http://localhost:3000/transmit`
// You will see the request data you submitted reflected in the netcat output, and
// should be able to type a response and have it appear as a response to the cURL
// request. This demonstrates that the client HTTP requests are getting correctly
// multiplexed over the underlying raw network connections.
//
// For more comprehensive tests, see the accompanying Go test suite.
func main() {
server := NewSocketMultiplexer(LISTEN_PORT)
err1 := server.AddRoute("a", ROUTE_SERVER, 3001)
err2 := server.AddRoute("b", ROUTE_SERVER, 3002)
if err1 != nil || err2 != nil {
log.Panicln("Failed to initialize socket routes:", err1, err2)
}
log.Fatal(server.Listen())
}
package main
import (
"bufio"
"encoding/json"
"fmt"
"math/rand"
"net"
"net/http"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
)
const (
TestRequest = "testRequest"
TestResponse = "testResponse"
)
// Atomic counter to track available ports (assuming no external services are using
// them) and ensure that tests running in parallal in this suite don't attempt
// (and fail) to concurrently listen on the same port.
var AVAILABLE_PORT int32 = 3000
func nextPortNum() int {
return int(atomic.AddInt32(&AVAILABLE_PORT, 1))
}
// A Delayer implements a delay strategy to artifically simulate load
type Delayer interface {
Delay()
}
// NoDelayer is a Delayer that doesn't do any delaying
type NoDelayer struct{}
func (n *NoDelayer) Delay() {}
// RandomDelayer is a Delayer that delays a random amount between 0 and maxDelayMillis milliseconds
type RandomDelayer struct {
maxDelayMillis int // maximum delay in milliseconds
}
func (r *RandomDelayer) Delay() {
time.Sleep(time.Duration(rand.Intn(r.maxDelayMillis)) * time.Millisecond)
}
// A Responder is responsible for validating an incoming test request message and
// constructing a response message based on that request
type Responder interface {
ValidRequest(string) bool
Respond(string) string
}
// HardCodedResponder is a Responder that validates against a hardcoded request, and
// responds with a hardcoded response
type HardCodedResponder struct {
request string
response string
}
func (h *HardCodedResponder) ValidRequest(message string) bool { return message == h.request }
func (h *HardCodedResponder) Respond(message string) string { return h.response }
// EchoResponder is a Responder that responds with the original request message. No
// validation is performed on the incoming request.
//
// This responder is a simple way to ensure that clients are receiving the correct data
// from a multiplexed connection. Each client can construct a unique request and then
// verify that the response they receive contains that unique data.
type EchoResponder struct{}
func (e *EchoResponder) ValidRequest(message string) bool { return true }
func (e *EchoResponder) Respond(message string) string { return message }
// TestServer is a minimal implementation of a server that communicates directly via
// a raw network connection. It mimics the salient behavior of the type of servers
// that the SocketMultiplexer is meant to front without caring about the actual data
// being sent and received.
//
// Importantly, a given instance of TestServer only accepts a single connection before
// shutting down (but loops forever on communication over that single connection). This
// is in contrast to a normal server, which listens continually on its port, accepting
// as many connections as possible. This behavior ensures that the SocketMultiplexer is
// keeping connections open as expected, because otherwise it will lose it's ability to
// communicate with the TestServer.
type TestServer struct {
connType string
port int
delayer Delayer
responder Responder
shutdownTrigger chan struct{}
}
// NewTestServer initializes a new TestServer to listen on the provided port.
// The provided Delayer is used to determine the network/ server latency strategy to
// simulate. Likewise, the provided Responder is used to determine the request/ response
// validation strategy.
func NewTestServer(port int, delayer Delayer, responder Responder) *TestServer {
return &TestServer{
connType: MULTIPLEXED_CONN_TYPE,
port: port,
delayer: delayer,
responder: responder,
shutdownTrigger: make(chan struct{}),
}
}
// Address returns the dialable address that this server is listening on
func (s *TestServer) Address() string {
return fmt.Sprintf("%s:%d", ROUTE_SERVER, s.port)
}
// Serve causes the TestServer to block and listen for a single connection on the
// associated port. Once a client connects, the connection remains open until the client
// disconnects, with data managed by the TestServer's Responder.
func (s *TestServer) Serve(t *testing.T) {
l, err := net.Listen(s.connType, fmt.Sprintf(":%d", s.port))
if err != nil {
t.Fatal(err)
}
defer l.Close()
t.Log("Waiting for connection")
conn, err := l.Accept()
t.Log("Got a connection")
if err != nil {
t.Fatal(err)
}
defer conn.Close()
// Loop, receiving and sending the specified messages. Note that we don't close the
// connection after every successful negotiation like you would do if you were dealing
// with an http connection. This simulates the kind of long running network connection
// that the SocketMultiplexer is made to deal with.
for {
select {
case <-s.shutdownTrigger:
return
default:
scanner := bufio.NewScanner(conn)
successfulScan := scanner.Scan()
if !successfulScan {
t.Log("Connection closed")
return
}
receivedMessage := scanner.Text()
if !s.responder.ValidRequest(receivedMessage) {
t.Error("Received invalid message:", receivedMessage)
}
// Simulate artificial delay
s.delayer.Delay()
writer := bufio.NewWriter(conn)
_, err1 := writer.WriteString(s.responder.Respond(receivedMessage) + "\r\n")
err2 := writer.Flush()
if err1 != nil || err2 != nil {
t.Error("Couldn't write response", err1, err2)
}
}
}
}
// Shutdown the TestServer and break out of its network connection listen loop
func (s *TestServer) Shutdown() {
close(s.shutdownTrigger)
}
// Test to ensure that the Route construct can successfully transmit data over its
// underlying network connection
func TestRouteConnections(t *testing.T) {
ts := NewTestServer(nextPortNum(), &NoDelayer{}, &HardCodedResponder{request: TestRequest, response: TestResponse})
go ts.Serve(t)
defer ts.Shutdown()
conn, err := net.Dial(MULTIPLEXED_CONN_TYPE, ts.Address())
if err != nil {
t.Error("Unable to connect to test server")
}
testRoute := NewRoute(conn)
response, err := testRoute.Transmit([]byte(TestRequest))
if err != nil {
t.Error("Unable to transmit data")
}
if string(response) != TestResponse {
t.Errorf("Expected %s, but got %s", TestResponse, response)
}
}
// Test to ensure that the SocketMultiplexer properly supports adding routes, fetching
// routes, and handling unspecified routes
func TestRouteValidation(t *testing.T) {
port := nextPortNum()
ts := NewTestServer(port, &NoDelayer{}, &EchoResponder{})
go ts.Serve(t)
defer ts.Shutdown()
muxServer := NewSocketMultiplexer(3000)
err := muxServer.AddRoute("a", ROUTE_SERVER, port)
if err != nil {
t.Fatal("Failed to initialize socket routes:", err)
}
_, ok := muxServer.getRoute("a")
if !ok {
t.Error("Expected valid route to return OK")
}
_, ok = muxServer.getRoute("b")
if ok {
t.Error("Expected invalid route to return false OK value")
}
}
// Test to ensure that a route stored in the SocketMultiplexer can be successfully
// retrieved and transmitted over.
func TestMultiplexerTransmission(t *testing.T) {
port := nextPortNum()
ts := NewTestServer(port, &NoDelayer{}, &HardCodedResponder{request: TestRequest, response: TestResponse})
go ts.Serve(t)
defer ts.Shutdown()
newRouteId := "c"
muxServer := NewSocketMultiplexer(3000)
err := muxServer.AddRoute(newRouteId, "localhost", port)
if err != nil {
t.Error("Unable to create new multiplexed route", err)
}
route, ok := muxServer.getRoute(newRouteId)
if !ok {
t.Errorf("Newly created route %v not found", newRouteId)
}
resp, err := muxServer.transmitData(route, TestRequest)
if err != nil {
t.Error("Couldn't transmit data on newly created route", err)
}
if resp != TestResponse {
t.Errorf("Expected response of %s on new route, but got %s", TestResponse, resp)
}
}
// Helper to simulate and submit a client request, and do intermediate validation
// before returning the response.
func SubmitClientRequest(t *testing.T, address string, jsonBlob string) Response {
resp, err := http.Post(address, "application/json", strings.NewReader(jsonBlob))
if err != nil {
t.Fatalf("Error making client request %q to address %q: %v", jsonBlob, address, err)
}
if resp.StatusCode != http.StatusOK {
t.Errorf("Received %v status code for request %q", resp.StatusCode, jsonBlob)
}
var response Response
err = json.NewDecoder(resp.Body).Decode(&response)
if err != nil {
t.Errorf("Unable to decode response for request %q: %v", jsonBlob, err)
}
return response
}
// Test to ensure that Routes are usable for many, long-lived requests, with arbitrary
// network delays.
func TestRouteConnectionWithDelay(t *testing.T) {
delayer := &RandomDelayer{maxDelayMillis: 1000}
responder := &HardCodedResponder{request: TestRequest, response: TestResponse}
ts := NewTestServer(nextPortNum(), delayer, responder)
go ts.Serve(t)
defer ts.Shutdown()
// Note that we initialize the connection once up front and then use it (via the
// Route construct) many times, which is precisely what the SocketMultiplexer needs
// to do under the hood.
conn, err := net.Dial(MULTIPLEXED_CONN_TYPE, ts.Address())
if err != nil {
t.Error("Unable to connect to test server")
}
testRoute := NewRoute(conn)
iterations := 10
for n := 0; n < iterations; n++ {
response, err := testRoute.Transmit([]byte(TestRequest))
if err != nil {
t.Errorf("[%v/%v] Unable to transmit data", n, iterations)
}
if string(response) != TestResponse {
t.Errorf("[%v/%v] Expected %s, but got %s", n, iterations, TestResponse, response)
}
}
}
// Helper to automatically spin up a SocketMultipler with each of the desired routes
// automatically associated with an arbitrary network connection backed by a TestServer.
//
// This is a means of reducing the redundant plumbing needed to hook up a working
// test SocketMultiplexer with valid backing network connections.
func NewMuxWithBackingConns(t *testing.T, desiredRoutes []string, delayer Delayer, responder Responder) (*SocketMultiplexer, []*TestServer) {
muxServer := NewSocketMultiplexer(nextPortNum())
testServers := make([]*TestServer, len(desiredRoutes))
for i, route := range desiredRoutes {
port := nextPortNum()
ts := NewTestServer(port, delayer, responder)
go ts.Serve(t)
testServers[i] = ts
err := muxServer.AddRoute(route, ROUTE_SERVER, port)
if err != nil {
t.Error("Unable to create new multiplexed route", err)
}
}
return muxServer, testServers
}
// Helper to shutdown a collection of TestServers in one go
func massShutdown(testServers []*TestServer) {
for _, ts := range testServers {
ts.Shutdown()
}
}
// Test to ensure that client requests are correctly multiplexed by the SocketMultiplexer
// to the underlying network connections, and responses are correctly returned to the
// client via HTTP response
func TestMultiplexingRoutes(t *testing.T) {
routes := []string{"d", "e"}
muxServer, testServers := NewMuxWithBackingConns(t, routes, &NoDelayer{}, &EchoResponder{})
go muxServer.Listen()
defer massShutdown(testServers)
address := fmt.Sprintf("http://localhost:%v/transmit", muxServer.Port())
requestTemplate := `{"RouteId": "%s", "Data": "%s-%v"}`
responseTemplate := "%s-%v"
for _, route := range routes {
request := fmt.Sprintf(requestTemplate, route, TestRequest, route)
response := SubmitClientRequest(t, address, request)
if response.Data != fmt.Sprintf(responseTemplate, TestRequest, route) {
t.Errorf("Received incorrect data from multiplexed connection for route %q", route)
}
}
}
// Helper to choose a random string value from a list of strings. Useful to simulate
// random load on the SocketMultiplexer from clients looking to communicate on
// arbitrary routes.
func chooseRandomly(choices []string) string {
return choices[rand.Intn(len(choices))]
}
// Test to ensure that the SocketMultiplexer is capable of handling work from multiple
// clients in parallel while properly routing data between the backing network
// connections.
func TestMultiplexingConcurrentConnections(t *testing.T) {
routes := []string{"f", "g"}
muxServer, testServers := NewMuxWithBackingConns(t, routes, &NoDelayer{}, &EchoResponder{})
go muxServer.Listen()
defer massShutdown(testServers)
address := fmt.Sprintf("http://localhost:%v/transmit", muxServer.Port())
requestTemplate := `{"RouteId": "%s", "Data": "ThreadId[%d]-RouteId[%s]"}`
responseTemplate := "ThreadId[%d]-RouteId[%s]"
threads := 10
var wg sync.WaitGroup
for n := 0; n < threads; n++ {
wg.Add(1)
go func(threadId int) {
t.Logf("Thread %d starting", threadId)
defer wg.Done()
routeId := chooseRandomly(routes)
response := SubmitClientRequest(t, address, fmt.Sprintf(requestTemplate, routeId, threadId, routeId))
if response.Success != true {
t.Error("Expected successful response")
}
expectedResponseData := fmt.Sprintf(responseTemplate, threadId, routeId)
if response.Data != expectedResponseData {
t.Errorf("Response data %q differs from expected response of %q", response.Data, expectedResponseData)
}
}(n)
}
wg.Wait() // Wait for all the goroutines to finish before considering the test done
}
// Test to ensure that the SocketMultiplexer is capable of handling work from multiple
// clients in parallel while properly routing data between the backing network
// connections even in the face of network/ backend delays.
func TestMultiplexingConcurrentConnectionsWithDelay(t *testing.T) {
routes := []string{"h", "i", "j"}
delayer := &RandomDelayer{maxDelayMillis: 1000}
muxServer, testServers := NewMuxWithBackingConns(t, routes, delayer, &EchoResponder{})
go muxServer.Listen()
defer massShutdown(testServers)
address := fmt.Sprintf("http://localhost:%v/transmit", muxServer.Port())
requestTemplate := `{"RouteId": "%s", "Data": "ThreadId[%d]-RouteId[%s]"}`
responseTemplate := "ThreadId[%d]-RouteId[%s]"
threads := 30
var wg sync.WaitGroup
for n := 0; n < threads; n++ {
wg.Add(1)
go func(threadId int) {
t.Logf("Thread %d starting", threadId)
defer wg.Done()
routeId := chooseRandomly(routes)
response := SubmitClientRequest(t, address, fmt.Sprintf(requestTemplate, routeId, threadId, routeId))
if response.Success != true {
t.Error("Expected successful response")
}
expectedResponseData := fmt.Sprintf(responseTemplate, threadId, routeId)
if response.Data != expectedResponseData {
t.Errorf("Response data %q differs from expected response of %q", response.Data, expectedResponseData)
}
}(n)
}
wg.Wait()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment