Skip to content

Instantly share code, notes, and snippets.

@lnsp
Created January 3, 2020 12:24
Show Gist options
  • Save lnsp/e3987b2ce6aac8e5c2259fd214746d06 to your computer and use it in GitHub Desktop.
Save lnsp/e3987b2ce6aac8e5c2259fd214746d06 to your computer and use it in GitHub Desktop.
Simple HTTP Pub/Sub server
package main
import (
"context"
"fmt"
"io/ioutil"
"net/http"
"os"
"sync"
)
// NewTopic creates a new topic.
func NewTopic() *Topic {
t := &Topic{
mu: make(chan bool, 1),
}
t.mu <- true
return t
}
type Topic struct {
mu chan bool
subs []Subscriber
}
func (t *Topic) TryLock(ctx context.Context) bool {
select {
case <-t.mu:
return true
case <-ctx.Done():
return false
}
}
func (t *Topic) Unlock() {
t.mu <- true
}
func (t *Topic) Publish(ctx context.Context, msg []byte) {
if !t.TryLock(ctx) {
return
}
subs := t.subs[:]
t.subs = nil
t.Unlock()
for _, s := range subs {
select {
case s.Buf <- msg:
close(s.Buf)
case <-ctx.Done():
return
}
}
}
func (t *Topic) Subscribe(ctx context.Context) (Subscriber, error) {
if !t.TryLock(ctx) {
return Subscriber{}, fmt.Errorf("could not aquire topic lock")
}
sub := Subscriber{ctx, make(chan []byte)}
t.subs = append(t.subs, sub)
t.Unlock()
return sub, nil
}
type Subscriber struct {
Context context.Context
Buf chan []byte
}
func (s *Subscriber) Receive() []byte {
select {
case msg := <-s.Buf:
return msg
case <-s.Context.Done():
return nil
}
}
func main() {
var mu sync.Mutex
topics := make(map[string]*Topic)
handler := func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet && r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
mu.Lock()
topic, ok := topics[r.URL.Path]
if !ok {
topic = NewTopic()
topics[r.URL.Path] = topic
}
mu.Unlock()
switch r.Method {
case http.MethodGet:
sub, err := topic.Subscribe(r.Context())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
msg := sub.Receive()
w.Write(msg)
case http.MethodPost:
msg, _ := ioutil.ReadAll(r.Body)
topic.Publish(r.Context(), msg)
}
}
addr := os.Getenv("HTTQ_ADDR")
fmt.Fprintf(os.Stderr, "listen: httq %v\n", addr)
srv := &http.Server{
ReadTimeout: 0,
WriteTimeout: 0,
Addr: addr,
Handler: http.HandlerFunc(handler),
}
if err := srv.ListenAndServe(); err != nil {
fmt.Fprintf(os.Stderr, "listen: %v\n", err)
os.Exit(1)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment