Created
January 3, 2020 12:24
-
-
Save lnsp/e3987b2ce6aac8e5c2259fd214746d06 to your computer and use it in GitHub Desktop.
Simple HTTP Pub/Sub server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"fmt" | |
"io/ioutil" | |
"net/http" | |
"os" | |
"sync" | |
) | |
// NewTopic creates a new topic. | |
func NewTopic() *Topic { | |
t := &Topic{ | |
mu: make(chan bool, 1), | |
} | |
t.mu <- true | |
return t | |
} | |
type Topic struct { | |
mu chan bool | |
subs []Subscriber | |
} | |
func (t *Topic) TryLock(ctx context.Context) bool { | |
select { | |
case <-t.mu: | |
return true | |
case <-ctx.Done(): | |
return false | |
} | |
} | |
func (t *Topic) Unlock() { | |
t.mu <- true | |
} | |
func (t *Topic) Publish(ctx context.Context, msg []byte) { | |
if !t.TryLock(ctx) { | |
return | |
} | |
subs := t.subs[:] | |
t.subs = nil | |
t.Unlock() | |
for _, s := range subs { | |
select { | |
case s.Buf <- msg: | |
close(s.Buf) | |
case <-ctx.Done(): | |
return | |
} | |
} | |
} | |
func (t *Topic) Subscribe(ctx context.Context) (Subscriber, error) { | |
if !t.TryLock(ctx) { | |
return Subscriber{}, fmt.Errorf("could not aquire topic lock") | |
} | |
sub := Subscriber{ctx, make(chan []byte)} | |
t.subs = append(t.subs, sub) | |
t.Unlock() | |
return sub, nil | |
} | |
type Subscriber struct { | |
Context context.Context | |
Buf chan []byte | |
} | |
func (s *Subscriber) Receive() []byte { | |
select { | |
case msg := <-s.Buf: | |
return msg | |
case <-s.Context.Done(): | |
return nil | |
} | |
} | |
func main() { | |
var mu sync.Mutex | |
topics := make(map[string]*Topic) | |
handler := func(w http.ResponseWriter, r *http.Request) { | |
if r.Method != http.MethodGet && r.Method != http.MethodPost { | |
http.Error(w, "method not allowed", http.StatusMethodNotAllowed) | |
return | |
} | |
mu.Lock() | |
topic, ok := topics[r.URL.Path] | |
if !ok { | |
topic = NewTopic() | |
topics[r.URL.Path] = topic | |
} | |
mu.Unlock() | |
switch r.Method { | |
case http.MethodGet: | |
sub, err := topic.Subscribe(r.Context()) | |
if err != nil { | |
http.Error(w, err.Error(), http.StatusInternalServerError) | |
return | |
} | |
msg := sub.Receive() | |
w.Write(msg) | |
case http.MethodPost: | |
msg, _ := ioutil.ReadAll(r.Body) | |
topic.Publish(r.Context(), msg) | |
} | |
} | |
addr := os.Getenv("HTTQ_ADDR") | |
fmt.Fprintf(os.Stderr, "listen: httq %v\n", addr) | |
srv := &http.Server{ | |
ReadTimeout: 0, | |
WriteTimeout: 0, | |
Addr: addr, | |
Handler: http.HandlerFunc(handler), | |
} | |
if err := srv.ListenAndServe(); err != nil { | |
fmt.Fprintf(os.Stderr, "listen: %v\n", err) | |
os.Exit(1) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment