Skip to content

Instantly share code, notes, and snippets.

@johnrichardrinehart
Created May 14, 2020 18:46
Show Gist options
  • Save johnrichardrinehart/185fd48113a1e656b56d5bafcb777091 to your computer and use it in GitHub Desktop.
Save johnrichardrinehart/185fd48113a1e656b56d5bafcb777091 to your computer and use it in GitHub Desktop.
An HTTP server managing one subscription broker per URL path
package main
import (
"context"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"time"
"other/module/broker" // https://gist.github.com/johnrichardrinehart/8aae1d73b5bdda33eabff1cd91eb7979
)
type (
client struct {
r *http.Request
w http.ResponseWriter
}
server struct {
errChan chan error
brokers map[path]*broker // e.g. map["/path/1"]*broker
done chan bool
}
)
func main() {
// configure server instance
srv := server{
errChan: make(chan error),
brokers: make(map[path]*broker.Broker),
done: make(chan bool)}
httpServer := http.Server{
Handler: &srv,
}
go srv.ListenAndServe(&httpServer)
// ==== graceful cleanup ====
kill := make(chan os.Signal, 1)
signal.Notify(kill, os.Interrupt) // SIGINT (<ctrl>-C)
// ==== log runtime errors ====
go func() {
for err := range srv.errChan {
log.Printf("encountered error: %s", err)
}
}()
<-kill // blocks until SIGINT
close(srv.done) // shut down the HTTP server
log.Println("exiting...")
}
// ServeHTTP is the entry point for HTTP clients
// For yet unaccessed paths ServeHTTP creates a new broker and subscribes the first HTTP client to this broker
// For already-accessed paths it anly subscribes the new client to the already-existing broker
func (srv *server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
p := path(r.URL.Path)
b, ok := srv.brokers[p]
// If it's a newly-accessed path then we need a new broker for this client and any future ones
if !ok {
b = broker.New()
go b.Start()
srv.brokers[p] = b
go func() {
lookForMessages(b.publishCh, srv.done, srv.errChan) // blocks until close(s.done)
}()
}
sub := make(subscriber)
b.Subscribe(sub)
srv.WriteToClient(client{r, w}, sub, b.Unsubscribe) // blocks until client disconnects
}
// WriteToClient manages the reception of messages from the broker and writing them to the http.ResponseWriter
// net/http uses HTTP chunked encoding with this design
func (srv *server) WriteToClient(c client, sub subscriber, unsub func(subscriber)) {
c.w.WriteHeader(http.StatusOK)
c.w.Header().Set("Content-Type", "application/octet-stream")
for msg := range sub {
select {
// client has disconnected so unsubscribe and stop writing
case <-c.r.Context().Done():
log.Println("HTTP client disconnected")
unsub(sub)
return
// write to client
default:
n, err := c.w.Write(msg)
if err != nil {
srv.errChan <- fmt.Errorf("write error to client: %s", err)
continue
}
if n != len(msg) {
srv.errChan <- fmt.Errorf("unexpected writing bytes => %d, expected => %d", n, len(msg))
}
if flusher, ok := c.w.(http.Flusher); ok {
flusher.Flush()
}
}
}
}
// lookForMessages goroutines are spawned for every new path accessed by an HTTP client.
// lookForMessages uses the broker's pub channel to send messages to subscribers in the WriteToClient goroutine
// for each subscriber
func lookForMessages(pub chan message, done chan bool, errCh chan error) {
// pub <- newMsg (pass newly-discovered messages to pub)
// <-done (in case the parent wants us to stop looking (e.g. server shutdown))
// errCh <- uhoh (to signal the parent about runtime errors encountered in looking for messages)
}
// ListenAnServe starts the HTTP server and handles graceful cleanup in the case of a kill signal
func (srv *server) ListenAndServe(httpServer *http.Server) {
defer func() {
log.Println("closed HTTP server")
}()
go func() {
log.Printf("HTTP server listening at %s", httpServer.Addr)
if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
srv.errChan <- err
}
}()
// wait on a context cancellation, gracefully cleaning up at the end
select {
case <-srv.done:
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := httpServer.Shutdown(shutdownCtx); err != nil {
srv.errChan <- fmt.Errorf("error shutting down server: %s", err)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment