Created
April 9, 2019 05:52
-
-
Save sudhirj/3e3af0b0b7fcf69fd3c2bbe632d23135 to your computer and use it in GitHub Desktop.
Websocket handling
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
func (h *HTTPAdapter) Socket(w http.ResponseWriter, r *http.Request, p httprouter.Params) { | |
conn, err := socketUpgrader.Upgrade(w, r, http.Header{}) | |
if err != nil { | |
http.Error(w, "Socket could not be established", http.StatusExpectationFailed) | |
return | |
} | |
h.switchboard.Connect(h.topic, conn) | |
conn.SetCloseHandler(func(code int, text string) error { | |
h.switchboard.Disconnect(h.topic, conn) | |
_ = conn.Close() | |
return nil | |
}) | |
for { | |
_, _, err := conn.ReadMessage() | |
if err != nil { | |
return | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"sync" | |
"time" | |
) | |
type JSONWriter interface { | |
WriteJSON(v interface{}) error | |
} | |
type Broadcaster interface { | |
Broadcast(Recordable) | |
Receive(func(Message)) | |
} | |
type Channel struct { | |
connections sync.Map | |
} | |
func NewChannel() *Channel { | |
return &Channel{} | |
} | |
func (c *Channel) Connect(conn JSONWriter) { | |
c.connections.Store(conn, struct{}{}) | |
} | |
func (c *Channel) Disconnect(conn JSONWriter) { | |
c.connections.Delete(conn) | |
} | |
func (c *Channel) Broadcast(data interface{}) { | |
c.connections.Range(func(key, _ interface{}) bool { | |
_ = key.(JSONWriter).WriteJSON(data) | |
return true | |
}) | |
} | |
type Switchboard struct { | |
channels sync.Map | |
} | |
func NewSwitchboard() *Switchboard { | |
switchboard := &Switchboard{} | |
go switchboard.KeepAlive() | |
return switchboard | |
} | |
func (s *Switchboard) Connect(key string, conn JSONWriter) { | |
s.channel(key).Connect(conn) | |
} | |
func (s *Switchboard) channel(key string) *Channel { | |
c, _ := s.channels.LoadOrStore(key, NewChannel()) | |
return c.(*Channel) | |
} | |
func (s *Switchboard) Disconnect(key string, conn JSONWriter) { | |
s.channel(key).Disconnect(conn) | |
} | |
func (s *Switchboard) Broadcast(key string, data interface{}) { | |
s.channel(key).Broadcast(data) | |
} | |
func (s *Switchboard) KeepAlive() { | |
for { | |
select { | |
case <-time.After(45 * time.Second): | |
s.channels.Range(func(_, channel interface{}) bool { | |
channel.(*Channel).Broadcast( | |
map[string]string{ | |
"topic": "PING", | |
"time": time.Now().Format(time.RFC3339), | |
}) | |
return true | |
}) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment