Last active
February 27, 2021 16:23
-
-
Save jasonrdsouza/95f7a98d28f0c8538eb78479a8f05f0a to your computer and use it in GitHub Desktop.
Simple HTTP server to intelligently route and multiplex requests over a collection of backing network connections
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Small exploration into the ease of implementing an interface (and server) to abstract | |
// away the complexities of dealing with cumbersome and non-standardized network | |
// protocols. | |
// | |
// The resulting server exposes a simple HTTP API to clients, routing and forwarding | |
// their requests behind the scenes to a collection of raw network streams. The goal | |
// is to hide the underlying communication protocol and network multiplexing from | |
// clients, instead exposing a simple request/response API spoken in the lingua franca | |
// of the web. | |
// | |
// Only the Go standard library is used. | |
// | |
package main | |
import ( | |
"bufio" | |
"encoding/json" | |
"fmt" | |
"log" | |
"net" | |
"net/http" | |
"strconv" | |
"sync" | |
) | |
const ( | |
MULTIPLEXED_CONN_TYPE = "tcp" | |
ROUTE_SERVER = "localhost" | |
LISTEN_PORT = 3000 | |
) | |
// Simplistic example request struct to demonstrate client interaction with the | |
// SocketMultiplexer. In a real the client request would likely contain more | |
// business logic specific information/ fields. | |
type Request struct { | |
RouteId string // some identifier to choose from the possible available connections | |
Data string // the data to send | |
} | |
// Simplistic example response struct to demonstrate client interaction with the | |
// SocketMultiplexer. In a real the server response would likely contain more | |
// business logic specific information/ fields. | |
type Response struct { | |
Success bool // whether or not the request was successful | |
Data string // response data | |
} | |
// A Route allows for synchronous communication over a given network stream | |
// The writer and scanner dictate the specific dialect that is extracted from | |
// the stream of bytes sent and received on the connection. | |
type Route struct { | |
mutex sync.Mutex | |
conn net.Conn | |
writer *bufio.Writer | |
scanner *bufio.Scanner | |
} | |
// Initialize a new Route based on the given network connection. Here we automatically | |
// initialize the underlying scanner and writer, but in a real application they would | |
// likely have custom initialization logic to configure the reading and writing of bytes | |
// based on the underlying network protocol. | |
func NewRoute(conn net.Conn) *Route { | |
route := Route{} | |
route.conn = conn | |
route.writer = bufio.NewWriter(conn) | |
route.scanner = bufio.NewScanner(conn) | |
return &route | |
} | |
// Transmit allows for synchronous communication over the given Routes' | |
// underlying connection | |
// | |
// This function also serves as an example of bidirectional communication | |
// over a raw data stream. In this case, we're using newlines as the delimiter, | |
// but the actual message format could be anything, with the Routes writer and | |
// scanner configured to understand the protocol. | |
func (r *Route) Transmit(data []byte) ([]byte, error) { | |
r.mutex.Lock() | |
defer r.mutex.Unlock() | |
// write request data | |
_, err := r.writer.Write(data) | |
if err == nil { | |
_, err = r.writer.WriteString("\r\n") | |
} | |
if err == nil { | |
err = r.writer.Flush() | |
} | |
if err != nil { | |
return nil, err | |
} | |
// read response | |
r.scanner.Scan() | |
return r.scanner.Bytes(), nil | |
} | |
// A SocketMultiplexer server exposes a simple HTTP API to clients, listening for | |
// requests and automatically routing them to the appropriate underlying network stream. | |
// Client data is forwarded as raw bytes, and the response is collected and returned | |
// as a regular HTTP response. | |
// | |
// The SocketMultiplexer ensures that concurrent requests to the same network stream do | |
// not mangle eachother, and maintains long-lived connections to avoid handshake | |
// overhead. | |
type SocketMultiplexer struct { | |
listenPort int | |
routesMutex sync.RWMutex | |
routes map[string]*Route | |
} | |
func NewSocketMultiplexer(listenPort int) *SocketMultiplexer { | |
return &SocketMultiplexer{listenPort: listenPort, routes: make(map[string]*Route)} | |
} | |
func (s *SocketMultiplexer) Port() int { | |
return s.listenPort | |
} | |
// Listen starts up a blocking HTTP server that listens for client requests and | |
// forwards them on to the appropriate SocketMultiplexer handler. (Currently just a | |
// single demonstration handler, but more could easily be registered based on use case) | |
// Note that each incoming request is handled by its own go routine, so multiple | |
// requests can run concurrently. This increases throughput, but requires that | |
// the SocketMultiplexer internals are threadsafe. | |
func (s *SocketMultiplexer) Listen() error { | |
mux := http.NewServeMux() | |
// Register external HTTP API for clients to interact with | |
mux.HandleFunc("/transmit", s.transmitHandler) | |
log.Printf("Listening on port %d", s.listenPort) | |
return http.ListenAndServe(fmt.Sprintf(":%d", s.listenPort), mux) | |
} | |
// transmitHandler orchestrates multiplexing an incoming request over the appropriate | |
// underlying network stream. It is responsible for parsing and validating the | |
// supplied request, routing and transmitting data, and packaging up the underlying | |
// response in a form that the caller can understand. | |
// | |
// For demonstration purposes, we use a simple JSON request/ response format, but | |
// this can easily be changed to whatever format clients expect. | |
func (s *SocketMultiplexer) transmitHandler(w http.ResponseWriter, r *http.Request) { | |
if r.Method != http.MethodPost { | |
log.Println("Ignoring non-POST request") | |
http.Error(w, "Can only transmit via POST request", http.StatusMethodNotAllowed) | |
return | |
} | |
// Parse request | |
var request Request | |
err := json.NewDecoder(r.Body).Decode(&request) | |
if err != nil { | |
log.Println("Unable to decode request:", err.Error()) | |
http.Error(w, "Unable to decode request", http.StatusInternalServerError) | |
return | |
} | |
// Route request | |
route, ok := s.getRoute(request.RouteId) | |
if !ok { | |
http.Error( | |
w, | |
fmt.Sprintf("No route established for the given ID (%s)", request.RouteId), | |
http.StatusBadRequest) | |
return | |
} | |
responseData, err := s.transmitData(route, request.Data) | |
if err != nil { | |
log.Println("Unable to communicate with underlying network stream", err.Error()) | |
http.Error(w, "Network connectivity issue", http.StatusInternalServerError) | |
return | |
} | |
// Respond | |
response := Response{Success: true, Data: responseData} | |
// Note that while "success" is hardcoded to true here, in a real application, | |
// its value would depend on whether the service at the other end of the | |
// connection thought the request was successful or not | |
w.Header().Set("Content-Type", "application/json") | |
json.NewEncoder(w).Encode(response) | |
} | |
// AddRoute adds a TCP stream for the given host and port identified by the supplied | |
// route ID. Later requests and responses can use that route ID to send and receive data | |
// on that stream. | |
// | |
// Errors if the route ID already exists to avoid overwriting an active stream | |
func (s *SocketMultiplexer) AddRoute(routeId string, host string, port int) error { | |
s.routesMutex.Lock() | |
defer s.routesMutex.Unlock() | |
if _, ok := s.routes[routeId]; ok { | |
return fmt.Errorf("Route ID %s already exists", routeId) | |
} | |
conn, err := net.Dial(MULTIPLEXED_CONN_TYPE, net.JoinHostPort(host, strconv.Itoa(port))) | |
if err != nil { | |
return err | |
} | |
s.routes[routeId] = NewRoute(conn) | |
return nil | |
} | |
// getRoute fetches the route associated with the given routeId in a threadsafe fashion | |
func (s *SocketMultiplexer) getRoute(routeId string) (*Route, bool) { | |
s.routesMutex.RLock() | |
defer s.routesMutex.RUnlock() | |
route, ok := s.routes[routeId] | |
return route, ok | |
} | |
// transmitData transmits the given data over the supplied route. Note that once you | |
// have a pointer to a specific route, you aren't required to protect it against | |
// concurrent access, because the Route structure itself is designed to be threadsafe. | |
func (s *SocketMultiplexer) transmitData(route *Route, data string) (string, error) { | |
responseBytes, err := route.Transmit([]byte(data)) | |
return string(responseBytes), err | |
} | |
// Demonstration usage of the SocketMultiplexer. | |
// | |
// Here, we create a new SocketMultiplexer with hardcoded, preregistered | |
// routes. In a real application, routes would likely be part of a config file, or | |
// registered on the fly, both of which are fully supported via the AddRoute function. | |
// We only hardcode them here for simplicity and easy of demonstration. | |
// | |
// In order to run this main method, you'll first have to run a server capable of | |
// ACKing TCP connections on ports 3001 and 3002. The simplest way to do that is via | |
// the netcat tool: `nc -l PORT_NUMBER`. In our case, that means 2 instances of netcat | |
// running on ports 3001 and 3002. These two netcat processes simulate the types of | |
// raw socket communicating network services that this SocketMultiplexer is useful in | |
// fronting. | |
// | |
// Once that's done, you can run the SocketMultiplexer via your standard `go run` | |
// command (or equivalent). This will start a blocking http server, as well as | |
// initiate connections to the specified server + ports. | |
// | |
// With those two pieces in place, you can see the entire system working end to end by | |
// mimicing a client request. Using cURL (or equivalent), you can make a POST request | |
// to the port that the SockerMultiplexer server is running (3000 by default). | |
// For example: `curl --header "Content-Type: application/json" --request POST --data '{"RouteId": "a", "Data": "test request"}' http://localhost:3000/transmit` | |
// You will see the request data you submitted reflected in the netcat output, and | |
// should be able to type a response and have it appear as a response to the cURL | |
// request. This demonstrates that the client HTTP requests are getting correctly | |
// multiplexed over the underlying raw network connections. | |
// | |
// For more comprehensive tests, see the accompanying Go test suite. | |
func main() { | |
server := NewSocketMultiplexer(LISTEN_PORT) | |
err1 := server.AddRoute("a", ROUTE_SERVER, 3001) | |
err2 := server.AddRoute("b", ROUTE_SERVER, 3002) | |
if err1 != nil || err2 != nil { | |
log.Panicln("Failed to initialize socket routes:", err1, err2) | |
} | |
log.Fatal(server.Listen()) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bufio" | |
"encoding/json" | |
"fmt" | |
"math/rand" | |
"net" | |
"net/http" | |
"strings" | |
"sync" | |
"sync/atomic" | |
"testing" | |
"time" | |
) | |
const ( | |
TestRequest = "testRequest" | |
TestResponse = "testResponse" | |
) | |
// Atomic counter to track available ports (assuming no external services are using | |
// them) and ensure that tests running in parallal in this suite don't attempt | |
// (and fail) to concurrently listen on the same port. | |
var AVAILABLE_PORT int32 = 3000 | |
func nextPortNum() int { | |
return int(atomic.AddInt32(&AVAILABLE_PORT, 1)) | |
} | |
// A Delayer implements a delay strategy to artifically simulate load | |
type Delayer interface { | |
Delay() | |
} | |
// NoDelayer is a Delayer that doesn't do any delaying | |
type NoDelayer struct{} | |
func (n *NoDelayer) Delay() {} | |
// RandomDelayer is a Delayer that delays a random amount between 0 and maxDelayMillis milliseconds | |
type RandomDelayer struct { | |
maxDelayMillis int // maximum delay in milliseconds | |
} | |
func (r *RandomDelayer) Delay() { | |
time.Sleep(time.Duration(rand.Intn(r.maxDelayMillis)) * time.Millisecond) | |
} | |
// A Responder is responsible for validating an incoming test request message and | |
// constructing a response message based on that request | |
type Responder interface { | |
ValidRequest(string) bool | |
Respond(string) string | |
} | |
// HardCodedResponder is a Responder that validates against a hardcoded request, and | |
// responds with a hardcoded response | |
type HardCodedResponder struct { | |
request string | |
response string | |
} | |
func (h *HardCodedResponder) ValidRequest(message string) bool { return message == h.request } | |
func (h *HardCodedResponder) Respond(message string) string { return h.response } | |
// EchoResponder is a Responder that responds with the original request message. No | |
// validation is performed on the incoming request. | |
// | |
// This responder is a simple way to ensure that clients are receiving the correct data | |
// from a multiplexed connection. Each client can construct a unique request and then | |
// verify that the response they receive contains that unique data. | |
type EchoResponder struct{} | |
func (e *EchoResponder) ValidRequest(message string) bool { return true } | |
func (e *EchoResponder) Respond(message string) string { return message } | |
// TestServer is a minimal implementation of a server that communicates directly via | |
// a raw network connection. It mimics the salient behavior of the type of servers | |
// that the SocketMultiplexer is meant to front without caring about the actual data | |
// being sent and received. | |
// | |
// Importantly, a given instance of TestServer only accepts a single connection before | |
// shutting down (but loops forever on communication over that single connection). This | |
// is in contrast to a normal server, which listens continually on its port, accepting | |
// as many connections as possible. This behavior ensures that the SocketMultiplexer is | |
// keeping connections open as expected, because otherwise it will lose it's ability to | |
// communicate with the TestServer. | |
type TestServer struct { | |
connType string | |
port int | |
delayer Delayer | |
responder Responder | |
shutdownTrigger chan struct{} | |
} | |
// NewTestServer initializes a new TestServer to listen on the provided port. | |
// The provided Delayer is used to determine the network/ server latency strategy to | |
// simulate. Likewise, the provided Responder is used to determine the request/ response | |
// validation strategy. | |
func NewTestServer(port int, delayer Delayer, responder Responder) *TestServer { | |
return &TestServer{ | |
connType: MULTIPLEXED_CONN_TYPE, | |
port: port, | |
delayer: delayer, | |
responder: responder, | |
shutdownTrigger: make(chan struct{}), | |
} | |
} | |
// Address returns the dialable address that this server is listening on | |
func (s *TestServer) Address() string { | |
return fmt.Sprintf("%s:%d", ROUTE_SERVER, s.port) | |
} | |
// Serve causes the TestServer to block and listen for a single connection on the | |
// associated port. Once a client connects, the connection remains open until the client | |
// disconnects, with data managed by the TestServer's Responder. | |
func (s *TestServer) Serve(t *testing.T) { | |
l, err := net.Listen(s.connType, fmt.Sprintf(":%d", s.port)) | |
if err != nil { | |
t.Fatal(err) | |
} | |
defer l.Close() | |
t.Log("Waiting for connection") | |
conn, err := l.Accept() | |
t.Log("Got a connection") | |
if err != nil { | |
t.Fatal(err) | |
} | |
defer conn.Close() | |
// Loop, receiving and sending the specified messages. Note that we don't close the | |
// connection after every successful negotiation like you would do if you were dealing | |
// with an http connection. This simulates the kind of long running network connection | |
// that the SocketMultiplexer is made to deal with. | |
for { | |
select { | |
case <-s.shutdownTrigger: | |
return | |
default: | |
scanner := bufio.NewScanner(conn) | |
successfulScan := scanner.Scan() | |
if !successfulScan { | |
t.Log("Connection closed") | |
return | |
} | |
receivedMessage := scanner.Text() | |
if !s.responder.ValidRequest(receivedMessage) { | |
t.Error("Received invalid message:", receivedMessage) | |
} | |
// Simulate artificial delay | |
s.delayer.Delay() | |
writer := bufio.NewWriter(conn) | |
_, err1 := writer.WriteString(s.responder.Respond(receivedMessage) + "\r\n") | |
err2 := writer.Flush() | |
if err1 != nil || err2 != nil { | |
t.Error("Couldn't write response", err1, err2) | |
} | |
} | |
} | |
} | |
// Shutdown the TestServer and break out of its network connection listen loop | |
func (s *TestServer) Shutdown() { | |
close(s.shutdownTrigger) | |
} | |
// Test to ensure that the Route construct can successfully transmit data over its | |
// underlying network connection | |
func TestRouteConnections(t *testing.T) { | |
ts := NewTestServer(nextPortNum(), &NoDelayer{}, &HardCodedResponder{request: TestRequest, response: TestResponse}) | |
go ts.Serve(t) | |
defer ts.Shutdown() | |
conn, err := net.Dial(MULTIPLEXED_CONN_TYPE, ts.Address()) | |
if err != nil { | |
t.Error("Unable to connect to test server") | |
} | |
testRoute := NewRoute(conn) | |
response, err := testRoute.Transmit([]byte(TestRequest)) | |
if err != nil { | |
t.Error("Unable to transmit data") | |
} | |
if string(response) != TestResponse { | |
t.Errorf("Expected %s, but got %s", TestResponse, response) | |
} | |
} | |
// Test to ensure that the SocketMultiplexer properly supports adding routes, fetching | |
// routes, and handling unspecified routes | |
func TestRouteValidation(t *testing.T) { | |
port := nextPortNum() | |
ts := NewTestServer(port, &NoDelayer{}, &EchoResponder{}) | |
go ts.Serve(t) | |
defer ts.Shutdown() | |
muxServer := NewSocketMultiplexer(3000) | |
err := muxServer.AddRoute("a", ROUTE_SERVER, port) | |
if err != nil { | |
t.Fatal("Failed to initialize socket routes:", err) | |
} | |
_, ok := muxServer.getRoute("a") | |
if !ok { | |
t.Error("Expected valid route to return OK") | |
} | |
_, ok = muxServer.getRoute("b") | |
if ok { | |
t.Error("Expected invalid route to return false OK value") | |
} | |
} | |
// Test to ensure that a route stored in the SocketMultiplexer can be successfully | |
// retrieved and transmitted over. | |
func TestMultiplexerTransmission(t *testing.T) { | |
port := nextPortNum() | |
ts := NewTestServer(port, &NoDelayer{}, &HardCodedResponder{request: TestRequest, response: TestResponse}) | |
go ts.Serve(t) | |
defer ts.Shutdown() | |
newRouteId := "c" | |
muxServer := NewSocketMultiplexer(3000) | |
err := muxServer.AddRoute(newRouteId, "localhost", port) | |
if err != nil { | |
t.Error("Unable to create new multiplexed route", err) | |
} | |
route, ok := muxServer.getRoute(newRouteId) | |
if !ok { | |
t.Errorf("Newly created route %v not found", newRouteId) | |
} | |
resp, err := muxServer.transmitData(route, TestRequest) | |
if err != nil { | |
t.Error("Couldn't transmit data on newly created route", err) | |
} | |
if resp != TestResponse { | |
t.Errorf("Expected response of %s on new route, but got %s", TestResponse, resp) | |
} | |
} | |
// Helper to simulate and submit a client request, and do intermediate validation | |
// before returning the response. | |
func SubmitClientRequest(t *testing.T, address string, jsonBlob string) Response { | |
resp, err := http.Post(address, "application/json", strings.NewReader(jsonBlob)) | |
if err != nil { | |
t.Fatalf("Error making client request %q to address %q: %v", jsonBlob, address, err) | |
} | |
if resp.StatusCode != http.StatusOK { | |
t.Errorf("Received %v status code for request %q", resp.StatusCode, jsonBlob) | |
} | |
var response Response | |
err = json.NewDecoder(resp.Body).Decode(&response) | |
if err != nil { | |
t.Errorf("Unable to decode response for request %q: %v", jsonBlob, err) | |
} | |
return response | |
} | |
// Test to ensure that Routes are usable for many, long-lived requests, with arbitrary | |
// network delays. | |
func TestRouteConnectionWithDelay(t *testing.T) { | |
delayer := &RandomDelayer{maxDelayMillis: 1000} | |
responder := &HardCodedResponder{request: TestRequest, response: TestResponse} | |
ts := NewTestServer(nextPortNum(), delayer, responder) | |
go ts.Serve(t) | |
defer ts.Shutdown() | |
// Note that we initialize the connection once up front and then use it (via the | |
// Route construct) many times, which is precisely what the SocketMultiplexer needs | |
// to do under the hood. | |
conn, err := net.Dial(MULTIPLEXED_CONN_TYPE, ts.Address()) | |
if err != nil { | |
t.Error("Unable to connect to test server") | |
} | |
testRoute := NewRoute(conn) | |
iterations := 10 | |
for n := 0; n < iterations; n++ { | |
response, err := testRoute.Transmit([]byte(TestRequest)) | |
if err != nil { | |
t.Errorf("[%v/%v] Unable to transmit data", n, iterations) | |
} | |
if string(response) != TestResponse { | |
t.Errorf("[%v/%v] Expected %s, but got %s", n, iterations, TestResponse, response) | |
} | |
} | |
} | |
// Helper to automatically spin up a SocketMultipler with each of the desired routes | |
// automatically associated with an arbitrary network connection backed by a TestServer. | |
// | |
// This is a means of reducing the redundant plumbing needed to hook up a working | |
// test SocketMultiplexer with valid backing network connections. | |
func NewMuxWithBackingConns(t *testing.T, desiredRoutes []string, delayer Delayer, responder Responder) (*SocketMultiplexer, []*TestServer) { | |
muxServer := NewSocketMultiplexer(nextPortNum()) | |
testServers := make([]*TestServer, len(desiredRoutes)) | |
for i, route := range desiredRoutes { | |
port := nextPortNum() | |
ts := NewTestServer(port, delayer, responder) | |
go ts.Serve(t) | |
testServers[i] = ts | |
err := muxServer.AddRoute(route, ROUTE_SERVER, port) | |
if err != nil { | |
t.Error("Unable to create new multiplexed route", err) | |
} | |
} | |
return muxServer, testServers | |
} | |
// Helper to shutdown a collection of TestServers in one go | |
func massShutdown(testServers []*TestServer) { | |
for _, ts := range testServers { | |
ts.Shutdown() | |
} | |
} | |
// Test to ensure that client requests are correctly multiplexed by the SocketMultiplexer | |
// to the underlying network connections, and responses are correctly returned to the | |
// client via HTTP response | |
func TestMultiplexingRoutes(t *testing.T) { | |
routes := []string{"d", "e"} | |
muxServer, testServers := NewMuxWithBackingConns(t, routes, &NoDelayer{}, &EchoResponder{}) | |
go muxServer.Listen() | |
defer massShutdown(testServers) | |
address := fmt.Sprintf("http://localhost:%v/transmit", muxServer.Port()) | |
requestTemplate := `{"RouteId": "%s", "Data": "%s-%v"}` | |
responseTemplate := "%s-%v" | |
for _, route := range routes { | |
request := fmt.Sprintf(requestTemplate, route, TestRequest, route) | |
response := SubmitClientRequest(t, address, request) | |
if response.Data != fmt.Sprintf(responseTemplate, TestRequest, route) { | |
t.Errorf("Received incorrect data from multiplexed connection for route %q", route) | |
} | |
} | |
} | |
// Helper to choose a random string value from a list of strings. Useful to simulate | |
// random load on the SocketMultiplexer from clients looking to communicate on | |
// arbitrary routes. | |
func chooseRandomly(choices []string) string { | |
return choices[rand.Intn(len(choices))] | |
} | |
// Test to ensure that the SocketMultiplexer is capable of handling work from multiple | |
// clients in parallel while properly routing data between the backing network | |
// connections. | |
func TestMultiplexingConcurrentConnections(t *testing.T) { | |
routes := []string{"f", "g"} | |
muxServer, testServers := NewMuxWithBackingConns(t, routes, &NoDelayer{}, &EchoResponder{}) | |
go muxServer.Listen() | |
defer massShutdown(testServers) | |
address := fmt.Sprintf("http://localhost:%v/transmit", muxServer.Port()) | |
requestTemplate := `{"RouteId": "%s", "Data": "ThreadId[%d]-RouteId[%s]"}` | |
responseTemplate := "ThreadId[%d]-RouteId[%s]" | |
threads := 10 | |
var wg sync.WaitGroup | |
for n := 0; n < threads; n++ { | |
wg.Add(1) | |
go func(threadId int) { | |
t.Logf("Thread %d starting", threadId) | |
defer wg.Done() | |
routeId := chooseRandomly(routes) | |
response := SubmitClientRequest(t, address, fmt.Sprintf(requestTemplate, routeId, threadId, routeId)) | |
if response.Success != true { | |
t.Error("Expected successful response") | |
} | |
expectedResponseData := fmt.Sprintf(responseTemplate, threadId, routeId) | |
if response.Data != expectedResponseData { | |
t.Errorf("Response data %q differs from expected response of %q", response.Data, expectedResponseData) | |
} | |
}(n) | |
} | |
wg.Wait() // Wait for all the goroutines to finish before considering the test done | |
} | |
// Test to ensure that the SocketMultiplexer is capable of handling work from multiple | |
// clients in parallel while properly routing data between the backing network | |
// connections even in the face of network/ backend delays. | |
func TestMultiplexingConcurrentConnectionsWithDelay(t *testing.T) { | |
routes := []string{"h", "i", "j"} | |
delayer := &RandomDelayer{maxDelayMillis: 1000} | |
muxServer, testServers := NewMuxWithBackingConns(t, routes, delayer, &EchoResponder{}) | |
go muxServer.Listen() | |
defer massShutdown(testServers) | |
address := fmt.Sprintf("http://localhost:%v/transmit", muxServer.Port()) | |
requestTemplate := `{"RouteId": "%s", "Data": "ThreadId[%d]-RouteId[%s]"}` | |
responseTemplate := "ThreadId[%d]-RouteId[%s]" | |
threads := 30 | |
var wg sync.WaitGroup | |
for n := 0; n < threads; n++ { | |
wg.Add(1) | |
go func(threadId int) { | |
t.Logf("Thread %d starting", threadId) | |
defer wg.Done() | |
routeId := chooseRandomly(routes) | |
response := SubmitClientRequest(t, address, fmt.Sprintf(requestTemplate, routeId, threadId, routeId)) | |
if response.Success != true { | |
t.Error("Expected successful response") | |
} | |
expectedResponseData := fmt.Sprintf(responseTemplate, threadId, routeId) | |
if response.Data != expectedResponseData { | |
t.Errorf("Response data %q differs from expected response of %q", response.Data, expectedResponseData) | |
} | |
}(n) | |
} | |
wg.Wait() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment