Created
May 14, 2020 18:46
-
-
Save johnrichardrinehart/185fd48113a1e656b56d5bafcb777091 to your computer and use it in GitHub Desktop.
An HTTP server managing one subscription broker per URL path
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"fmt" | |
"log" | |
"net/http" | |
"os" | |
"os/signal" | |
"time" | |
"other/module/broker" // https://gist.github.com/johnrichardrinehart/8aae1d73b5bdda33eabff1cd91eb7979 | |
) | |
type ( | |
client struct { | |
r *http.Request | |
w http.ResponseWriter | |
} | |
server struct { | |
errChan chan error | |
brokers map[path]*broker // e.g. map["/path/1"]*broker | |
done chan bool | |
} | |
) | |
func main() { | |
// configure server instance | |
srv := server{ | |
errChan: make(chan error), | |
brokers: make(map[path]*broker.Broker), | |
done: make(chan bool)} | |
httpServer := http.Server{ | |
Handler: &srv, | |
} | |
go srv.ListenAndServe(&httpServer) | |
// ==== graceful cleanup ==== | |
kill := make(chan os.Signal, 1) | |
signal.Notify(kill, os.Interrupt) // SIGINT (<ctrl>-C) | |
// ==== log runtime errors ==== | |
go func() { | |
for err := range srv.errChan { | |
log.Printf("encountered error: %s", err) | |
} | |
}() | |
<-kill // blocks until SIGINT | |
close(srv.done) // shut down the HTTP server | |
log.Println("exiting...") | |
} | |
// ServeHTTP is the entry point for HTTP clients | |
// For yet unaccessed paths ServeHTTP creates a new broker and subscribes the first HTTP client to this broker | |
// For already-accessed paths it anly subscribes the new client to the already-existing broker | |
func (srv *server) ServeHTTP(w http.ResponseWriter, r *http.Request) { | |
p := path(r.URL.Path) | |
b, ok := srv.brokers[p] | |
// If it's a newly-accessed path then we need a new broker for this client and any future ones | |
if !ok { | |
b = broker.New() | |
go b.Start() | |
srv.brokers[p] = b | |
go func() { | |
lookForMessages(b.publishCh, srv.done, srv.errChan) // blocks until close(s.done) | |
}() | |
} | |
sub := make(subscriber) | |
b.Subscribe(sub) | |
srv.WriteToClient(client{r, w}, sub, b.Unsubscribe) // blocks until client disconnects | |
} | |
// WriteToClient manages the reception of messages from the broker and writing them to the http.ResponseWriter | |
// net/http uses HTTP chunked encoding with this design | |
func (srv *server) WriteToClient(c client, sub subscriber, unsub func(subscriber)) { | |
c.w.WriteHeader(http.StatusOK) | |
c.w.Header().Set("Content-Type", "application/octet-stream") | |
for msg := range sub { | |
select { | |
// client has disconnected so unsubscribe and stop writing | |
case <-c.r.Context().Done(): | |
log.Println("HTTP client disconnected") | |
unsub(sub) | |
return | |
// write to client | |
default: | |
n, err := c.w.Write(msg) | |
if err != nil { | |
srv.errChan <- fmt.Errorf("write error to client: %s", err) | |
continue | |
} | |
if n != len(msg) { | |
srv.errChan <- fmt.Errorf("unexpected writing bytes => %d, expected => %d", n, len(msg)) | |
} | |
if flusher, ok := c.w.(http.Flusher); ok { | |
flusher.Flush() | |
} | |
} | |
} | |
} | |
// lookForMessages goroutines are spawned for every new path accessed by an HTTP client. | |
// lookForMessages uses the broker's pub channel to send messages to subscribers in the WriteToClient goroutine | |
// for each subscriber | |
func lookForMessages(pub chan message, done chan bool, errCh chan error) { | |
// pub <- newMsg (pass newly-discovered messages to pub) | |
// <-done (in case the parent wants us to stop looking (e.g. server shutdown)) | |
// errCh <- uhoh (to signal the parent about runtime errors encountered in looking for messages) | |
} | |
// ListenAnServe starts the HTTP server and handles graceful cleanup in the case of a kill signal | |
func (srv *server) ListenAndServe(httpServer *http.Server) { | |
defer func() { | |
log.Println("closed HTTP server") | |
}() | |
go func() { | |
log.Printf("HTTP server listening at %s", httpServer.Addr) | |
if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { | |
srv.errChan <- err | |
} | |
}() | |
// wait on a context cancellation, gracefully cleaning up at the end | |
select { | |
case <-srv.done: | |
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) | |
defer cancel() | |
if err := httpServer.Shutdown(shutdownCtx); err != nil { | |
srv.errChan <- fmt.Errorf("error shutting down server: %s", err) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment