Skip to content

Instantly share code, notes, and snippets.

@FZambia
Last active April 15, 2017 16:22
Show Gist options
  • Save FZambia/44b1f36cb217aadc880a1c78dafeb4a9 to your computer and use it in GitHub Desktop.
Save FZambia/44b1f36cb217aadc880a1c78dafeb4a9 to your computer and use it in GitHub Desktop.
Nats PUB/SUB engine for Centrifugo
package enginenats
import (
"errors"
"sync"
"github.com/centrifugal/centrifugo/libcentrifugo/config"
"github.com/centrifugal/centrifugo/libcentrifugo/engine"
"github.com/centrifugal/centrifugo/libcentrifugo/logger"
"github.com/centrifugal/centrifugo/libcentrifugo/node"
"github.com/centrifugal/centrifugo/libcentrifugo/plugin"
"github.com/centrifugal/centrifugo/libcentrifugo/proto"
"github.com/centrifugal/centrifugo/libcentrifugo/engine/engineredis"
"github.com/nats-io/nats"
)
func init() {
plugin.RegisterEngine("nats", NatsEnginePlugin)
}
type NatsEngineConfig struct {
Prefix string
redis engine.Engine
}
type NatsEngine struct {
node *node.Node
config *NatsEngineConfig
nc *nats.Conn
subsMu sync.Mutex
subs map[string]*nats.Subscription
redis engine.Engine
}
func NatsEnginePlugin(n *node.Node, c config.Getter) (engine.Engine, error) {
e, _ := engineredis.RedisEnginePlugin(n, c)
return NewNatsEngine(n, &NatsEngineConfig{Prefix: "centrifugo", redis: e})
}
func NewNatsEngine(n *node.Node, conf *NatsEngineConfig) (engine.Engine, error) {
e := &NatsEngine{
node: n,
config: conf,
subs: make(map[string]*nats.Subscription),
}
return e, nil
}
func (e *NatsEngine) Name() string {
return "Nats – PUB/SUB only"
}
func (e *NatsEngine) Run() error {
e.config.redis.Run()
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
return err
}
nc.Subscribe(e.adminChannel(), e.handleAdmin)
nc.Subscribe(e.controlChannel(), e.handleControl)
e.nc = nc
return nil
}
func (e *NatsEngine) Shutdown() error {
return errors.New("Shutdown not implemented")
}
func (e *NatsEngine) adminChannel() string {
return e.config.Prefix + "." + "admin"
}
func (e *NatsEngine) controlChannel() string {
return e.config.Prefix + "." + "control"
}
func (e *NatsEngine) clientChannel(ch string) string {
return e.config.Prefix + "." + ch + ".client"
}
func (e *NatsEngine) joinChannel(ch string) string {
return e.config.Prefix + "." + ch + ".join"
}
func (e *NatsEngine) leaveChannel(ch string) string {
return e.config.Prefix + "." + ch + ".leave"
}
func (e *NatsEngine) handleClient(m *nats.Msg) {
var message proto.Message
err := message.Unmarshal(m.Data)
if err != nil {
logger.ERROR.Println(err)
return
}
err = e.node.ClientMsg(&message)
if err != nil {
logger.ERROR.Println(err)
}
}
func (e *NatsEngine) handleJoin(m *nats.Msg) {
var message proto.JoinMessage
err := message.Unmarshal(m.Data)
if err != nil {
logger.ERROR.Println(err)
return
}
err = e.node.JoinMsg(&message)
if err != nil {
logger.ERROR.Println(err)
}
}
func (e *NatsEngine) handleLeave(m *nats.Msg) {
var message proto.LeaveMessage
err := message.Unmarshal(m.Data)
if err != nil {
logger.ERROR.Println(err)
return
}
err = e.node.LeaveMsg(&message)
if err != nil {
logger.ERROR.Println(err)
}
}
func (e *NatsEngine) handleAdmin(m *nats.Msg) {
var message proto.AdminMessage
err := message.Unmarshal(m.Data)
if err != nil {
logger.ERROR.Println(err)
return
}
err = e.node.AdminMsg(&message)
if err != nil {
logger.ERROR.Println(err)
}
}
func (e *NatsEngine) handleControl(m *nats.Msg) {
var message proto.ControlMessage
err := message.Unmarshal(m.Data)
if err != nil {
logger.ERROR.Println(err)
return
}
err = e.node.ControlMsg(&message)
if err != nil {
logger.ERROR.Println(err)
}
}
func (e *NatsEngine) PublishMessage(message *proto.Message, opts *proto.ChannelOptions) <-chan error {
ch := message.Channel
msgBytes, _ := message.Marshal()
eChan := make(chan error, 1)
eChan <- e.nc.Publish(e.clientChannel(ch), msgBytes)
return eChan
}
func (e *NatsEngine) PublishJoin(message *proto.JoinMessage, opts *proto.ChannelOptions) <-chan error {
ch := message.Channel
msgBytes, _ := message.Marshal()
eChan := make(chan error, 1)
eChan <- e.nc.Publish(e.joinChannel(ch), msgBytes)
return eChan
}
func (e *NatsEngine) PublishLeave(message *proto.LeaveMessage, opts *proto.ChannelOptions) <-chan error {
ch := message.Channel
msgBytes, _ := message.Marshal()
eChan := make(chan error, 1)
eChan <- e.nc.Publish(e.leaveChannel(ch), msgBytes)
return eChan
}
func (e *NatsEngine) PublishControl(message *proto.ControlMessage) <-chan error {
msgBytes, _ := message.Marshal()
eChan := make(chan error, 1)
eChan <- e.nc.Publish(e.controlChannel(), msgBytes)
return eChan
}
func (e *NatsEngine) PublishAdmin(message *proto.AdminMessage) <-chan error {
msgBytes, _ := message.Marshal()
eChan := make(chan error, 1)
eChan <- e.nc.Publish(e.adminChannel(), msgBytes)
return eChan
}
func (e *NatsEngine) Subscribe(ch string) error {
// TODO: run this subscribe and unsubscribe in separate goroutine to be safe for concurrent access.
e.subsMu.Lock()
defer e.subsMu.Unlock()
subClient, err := e.nc.Subscribe(e.clientChannel(ch), e.handleClient)
if err != nil {
return err
}
subJoin, err := e.nc.Subscribe(e.joinChannel(ch), e.handleJoin)
if err != nil {
return err
}
subLeave, err := e.nc.Subscribe(e.leaveChannel(ch), e.handleLeave)
if err != nil {
return err
}
e.subs[e.clientChannel(ch)] = subClient
e.subs[e.joinChannel(ch)] = subJoin
e.subs[e.leaveChannel(ch)] = subLeave
return nil
}
func (e *NatsEngine) Unsubscribe(ch string) error {
// TODO: run this subscribe and unsubscribe in separate goroutine to be safe for concurrent access.
e.subsMu.Lock()
defer e.subsMu.Unlock()
if sub, ok := e.subs[e.clientChannel(ch)]; ok {
sub.Unsubscribe()
delete(e.subs, e.clientChannel(ch))
}
if sub, ok := e.subs[e.joinChannel(ch)]; ok {
sub.Unsubscribe()
delete(e.subs, e.joinChannel(ch))
}
if sub, ok := e.subs[e.leaveChannel(ch)]; ok {
sub.Unsubscribe()
delete(e.subs, e.leaveChannel(ch))
}
return nil
}
func (e *NatsEngine) AddPresence(ch string, uid string, info proto.ClientInfo, expire int) error {
return e.config.redis.AddPresence(ch, uid, info, expire)
}
func (e *NatsEngine) RemovePresence(ch string, uid string) error {
return e.config.redis.RemovePresence(ch, uid)
}
func (e *NatsEngine) Presence(ch string) (map[string]proto.ClientInfo, error) {
return e.config.redis.Presence(ch)
}
func (e *NatsEngine) History(ch string, limit int) ([]proto.Message, error) {
return e.config.redis.History(ch, limit)
}
func (e *NatsEngine) Channels() ([]string, error) {
return []string{}, nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment